reads.rs•40.6 kB
//! Read set tracking for an active transaction
use std::{
collections::BTreeMap,
sync::LazyLock,
};
use cmd_util::env::env_config;
use common::{
bootstrap_model::index::database_index::IndexedFields,
components::ComponentPath,
document::{
IndexKeyBuffer,
PackedDocument,
},
document_index_keys::{
DocumentIndexKeyValue,
DocumentIndexKeys,
},
interval::{
Interval,
IntervalSet,
},
knobs::{
TRANSACTION_MAX_READ_SET_INTERVALS,
TRANSACTION_MAX_READ_SIZE_BYTES,
TRANSACTION_MAX_READ_SIZE_ROWS,
},
static_span,
types::{
PersistenceVersion,
TabletIndexName,
Timestamp,
},
value::ResolvedDocumentId,
};
use errors::ErrorMetadata;
use search::QueryReads as SearchQueryReads;
use usage_tracking::FunctionUsageTracker;
use value::{
heap_size::{
HeapSize,
WithHeapSize,
},
TableName,
TabletId,
};
#[cfg(doc)]
use crate::Transaction;
use crate::{
database::{
ConflictingRead,
ConflictingReadWithWriteSource,
},
metrics,
stack_traces::StackTrace,
write_log::{
DocumentIndexKeysUpdate,
PackedDocumentUpdate,
WriteSource,
},
};
pub const OVER_LIMIT_HELP: &str = "Consider using smaller limits in your queries, paginating your \
queries, or using indexed queries with a selective index range \
expressions.";
/// If set to 'true', then collect backtraces of every database read in order
/// to help debug OCC errors. Collecting stack traces is expensive and should
/// only be used in development.
static READ_SET_CAPTURE_BACKTRACES: LazyLock<bool> =
LazyLock::new(|| env_config("READ_SET_CAPTURE_BACKTRACES", false));
#[cfg_attr(any(test, feature = "testing"), derive(PartialEq, Eq))]
#[derive(Debug, Clone)]
pub struct IndexReads {
pub fields: IndexedFields,
pub intervals: IntervalSet,
pub stack_traces: Option<Vec<(Interval, StackTrace)>>,
}
impl HeapSize for IndexReads {
fn heap_size(&self) -> usize {
self.fields.heap_size() + self.intervals.heap_size()
}
}
#[derive(Debug, Clone)]
#[cfg_attr(any(test, feature = "testing"), derive(PartialEq, Eq))]
pub struct ReadSet {
indexed: WithHeapSize<BTreeMap<TabletIndexName, IndexReads>>,
search: WithHeapSize<BTreeMap<TabletIndexName, SearchQueryReads>>,
}
impl HeapSize for ReadSet {
fn heap_size(&self) -> usize {
self.indexed.heap_size() + self.search.heap_size()
}
}
impl ReadSet {
pub fn empty() -> Self {
Self {
indexed: WithHeapSize::default(),
search: WithHeapSize::default(),
}
}
pub fn new(
indexed: BTreeMap<TabletIndexName, IndexReads>,
search: BTreeMap<TabletIndexName, SearchQueryReads>,
) -> Self {
Self {
indexed: indexed.into(),
search: search.into(),
}
}
/// Iterate over all range reads for the given index.
pub fn iter_indexed(&self) -> impl Iterator<Item = (&TabletIndexName, &IndexReads)> {
self.indexed.iter()
}
#[cfg(test)]
pub fn index_reads_for_test(&self, index_name: &TabletIndexName) -> IntervalSet {
self.indexed
.get(index_name)
.map(|reads| reads.intervals.clone())
.unwrap_or_default()
}
pub fn iter_search(&self) -> impl Iterator<Item = (&TabletIndexName, &SearchQueryReads)> {
self.search.iter()
}
pub fn consume(
self,
) -> (
impl Iterator<Item = (TabletIndexName, IndexReads)>,
impl Iterator<Item = (TabletIndexName, SearchQueryReads)>,
) {
(self.indexed.into_iter(), self.search.into_iter())
}
/// Determine whether a mutation to a document overlaps with the read set.
///
/// `reusable_buffer` is passed as a parameter to avoid repeated
/// allocations.
pub fn overlaps_document(
&self,
document: &PackedDocument,
persistence_version: PersistenceVersion,
reusable_buffer: &mut IndexKeyBuffer,
) -> Option<ConflictingRead> {
for (
index,
IndexReads {
fields,
intervals,
stack_traces,
},
) in iter_indexes_for_table(&self.indexed, document.id().tablet_id)
{
let index_key = document.index_key(fields, persistence_version, reusable_buffer);
if intervals.contains(index_key) {
let stack_traces = stack_traces.as_ref().map(|st| {
st.iter()
.filter_map(|(interval, trace)| {
if interval.contains(index_key) {
Some(trace.clone())
} else {
None
}
})
.collect()
});
return Some(ConflictingRead {
index: index.clone(),
id: document.id(),
stack_traces,
});
}
}
for (index, search_reads) in iter_indexes_for_table(&self.search, document.id().tablet_id) {
if search_reads.overlaps_document(document) {
return Some(ConflictingRead {
index: index.clone(),
id: document.id(),
stack_traces: None,
});
}
}
None
}
#[cfg(test)]
pub fn overlaps_document_for_test(
&self,
document: &PackedDocument,
persistence_version: PersistenceVersion,
) -> Option<ConflictingRead> {
self.overlaps_document(document, persistence_version, &mut IndexKeyBuffer::new())
}
/// Determine whether a mutation to a document overlaps with the read set.
/// Similar to `overlaps_document` but takes the index keys instead of the
/// full document.
pub fn overlaps_index_keys(
&self,
id: ResolvedDocumentId,
index_keys: &DocumentIndexKeys,
) -> Option<ConflictingRead> {
// Standard indexes
for (
index,
IndexReads {
intervals,
stack_traces,
..
},
) in iter_indexes_for_table(&self.indexed, id.tablet_id)
{
let Some(DocumentIndexKeyValue::Standard(index_key)) = index_keys.get(index) else {
metrics::log_missing_index_key_staleness();
continue;
};
if intervals.contains(index_key) {
let stack_traces = stack_traces.as_ref().map(|st| {
st.iter()
.filter_map(|(interval, trace)| {
if interval.contains(index_key) {
Some(trace.clone())
} else {
None
}
})
.collect()
});
return Some(ConflictingRead {
index: index.clone(),
id,
stack_traces,
});
}
}
// Search indexes
for (index, search_reads) in iter_indexes_for_table(&self.search, id.tablet_id) {
let Some(DocumentIndexKeyValue::Search(value)) = index_keys.get(index) else {
metrics::log_missing_search_index_key_staleness();
continue;
};
if search_reads.overlaps_search_index_key_value(value) {
return Some(ConflictingRead {
index: index.clone(),
id,
stack_traces: None,
});
}
}
None
}
/// writes_overlap_docs is the core logic for
/// detecting whether a transaction or subscription intersects a commit.
/// If a write transaction intersects, it will be retried to maintain
/// serializability. If a subscription intersects, it will be rerun and the
/// result sent to all clients.
#[fastrace::trace]
pub fn writes_overlap_docs<'a>(
&self,
updates: impl Iterator<
Item = (
&'a Timestamp,
impl Iterator<Item = &'a (ResolvedDocumentId, PackedDocumentUpdate)>,
&'a WriteSource,
),
>,
persistence_version: PersistenceVersion,
) -> Option<ConflictingReadWithWriteSource> {
let mut buffer = IndexKeyBuffer::new();
for (update_ts, updates, write_source) in updates {
for (_, update) in updates {
if let Some(ref document) = update.new_document {
if let Some(conflicting_read) =
self.overlaps_document(document, persistence_version, &mut buffer)
{
return Some(ConflictingReadWithWriteSource {
read: conflicting_read,
write_source: write_source.clone(),
write_ts: *update_ts,
});
}
}
if let Some(ref prev_value) = update.old_document {
if let Some(conflicting_read) =
self.overlaps_document(prev_value, persistence_version, &mut buffer)
{
return Some(ConflictingReadWithWriteSource {
read: conflicting_read,
write_source: write_source.clone(),
write_ts: *update_ts,
});
}
}
}
}
None
}
/// Equivalent to `writes_overlap_docs` but does not need to read the full
/// docs
#[fastrace::trace]
pub fn writes_overlap_index_keys<'a>(
&self,
updates: impl Iterator<
Item = (
&'a Timestamp,
impl Iterator<Item = &'a (ResolvedDocumentId, DocumentIndexKeysUpdate)>,
&'a WriteSource,
),
>,
) -> Option<ConflictingReadWithWriteSource> {
for (update_ts, updates, write_source) in updates {
for (id, update) in updates {
if let Some(ref document) = update.new_document_keys {
if let Some(conflicting_read) = self.overlaps_index_keys(*id, document) {
return Some(ConflictingReadWithWriteSource {
read: conflicting_read,
write_source: write_source.clone(),
write_ts: *update_ts,
});
}
}
if let Some(ref document) = update.old_document_keys {
if let Some(conflicting_read) = self.overlaps_index_keys(*id, document) {
return Some(ConflictingReadWithWriteSource {
read: conflicting_read,
write_source: write_source.clone(),
write_ts: *update_ts,
});
}
}
}
}
None
}
}
/// Iterates just those pairs in `map` whose table matches `tablet_id`
fn iter_indexes_for_table<T>(
map: &BTreeMap<TabletIndexName, T>,
tablet_id: TabletId,
) -> impl Iterator<Item = (&TabletIndexName, &T)> {
// uses the fact that TabletIndexName is ordered by TabletId first,
// then descriptor
map.range(TabletIndexName::min_for_table(tablet_id)..)
.take_while(move |(index, _)| *index.table() == tablet_id)
}
/// Tracks the read set for the current transaction. Records successful reads as
/// well as missing documents so we can ensure future reads in this transaction
/// are consistent against the current snapshot.
///
/// [`Transaction`] keeps this read set up to date when accessing documents
/// or the index. We want to minimize the amount of code that updates this state
/// so we avoid missing an update.
#[derive(Debug, Clone)]
pub struct TransactionReadSet {
read_set: ReadSet,
// Pre-computed sum of all of the `IntervalSet`'s sizes.
num_intervals: usize,
user_tx_size: TransactionReadSize,
system_tx_size: TransactionReadSize,
}
#[cfg(any(test, feature = "testing"))]
impl PartialEq for TransactionReadSet {
fn eq(&self, other: &Self) -> bool {
self.read_set.eq(&other.read_set)
&& self.num_intervals.eq(&other.num_intervals)
&& self.user_tx_size.eq(&other.user_tx_size)
&& self.system_tx_size.eq(&other.system_tx_size)
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq, derive_more::Add, derive_more::AddAssign)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub struct TransactionReadSize {
// Sum of doc.size() for all documents read.
pub total_document_size: usize,
// Count of all documents read.
pub total_document_count: usize,
}
impl TransactionReadSet {
/// Create a read-set at the given timestamp.
pub fn new() -> Self {
Self {
read_set: ReadSet::empty(),
num_intervals: 0,
user_tx_size: TransactionReadSize::default(),
system_tx_size: TransactionReadSize::default(),
}
}
pub fn into_read_set(self) -> ReadSet {
self.read_set
}
pub fn read_set(&self) -> &ReadSet {
&self.read_set
}
fn _record_indexed(
&mut self,
index_name: TabletIndexName,
fields: IndexedFields,
intervals: impl IntoIterator<Item = Interval>,
) -> (usize, usize) {
self.read_set.indexed.mutate_entry_or_insert_with(
index_name.clone(),
|| IndexReads {
fields: fields.clone(),
intervals: IntervalSet::new(),
stack_traces: (*READ_SET_CAPTURE_BACKTRACES).then_some(vec![]),
},
|reads| {
let IndexReads {
intervals: range_set,
stack_traces,
fields: existing_fields,
} = reads;
assert_eq!(
*existing_fields, fields,
"trying to change index fields for index {index_name:?}!"
);
let range_num_intervals_before = range_set.len();
for interval in intervals {
if let Some(stack_traces) = stack_traces.as_mut() {
stack_traces.push((interval.clone(), StackTrace::new()));
}
range_set.add(interval);
}
let range_num_intervals_after = range_set.len();
(range_num_intervals_before, range_num_intervals_after)
},
)
}
/// Call record_indexed_derived to take a read dependency when the user
/// didn't directly initiate the read and the read didn't go to persistence,
/// but we are taking a read dependency anyway.
/// For example, when writing to a table, take a derived read on the table
/// to make sure it still exists.
pub fn record_indexed_derived(
&mut self,
index_name: TabletIndexName,
fields: IndexedFields,
interval: Interval,
) {
self._record_indexed(index_name, fields, [interval]);
}
pub fn merge(
&mut self,
reads: ReadSet,
num_intervals: usize,
user_tx_size: TransactionReadSize,
system_tx_size: TransactionReadSize,
) {
let (index_reads, search_reads) = reads.consume();
for (index_name, index_reads) in index_reads {
self._record_indexed(index_name, index_reads.fields, index_reads.intervals.iter());
}
for (index_name, search_reads) in search_reads {
self.record_search(index_name, search_reads);
}
self.num_intervals += num_intervals;
self.user_tx_size += user_tx_size;
self.system_tx_size += system_tx_size;
}
pub fn record_read_document(
&mut self,
component_path: ComponentPath,
table_name: TableName,
document_size: usize,
usage_tracker: &FunctionUsageTracker,
is_virtual_table: bool,
) -> anyhow::Result<()> {
// Database bandwidth for document reads
let is_system_table = table_name.is_system() && !is_virtual_table;
usage_tracker.track_database_egress_size(
component_path.clone(),
table_name.to_string(),
document_size as u64,
is_system_table,
);
usage_tracker.track_database_egress_rows(
component_path,
table_name.to_string(),
1,
is_system_table,
);
let tx_size = if is_system_table {
&mut self.system_tx_size
} else {
&mut self.user_tx_size
};
// We always increment the size first, even if we throw,
// we want the size to reflect the read, so that
// we can tell that we threw and not issue a warning.
tx_size.total_document_count += 1;
tx_size.total_document_size += document_size;
if !is_system_table {
anyhow::ensure!(
tx_size.total_document_count <= *TRANSACTION_MAX_READ_SIZE_ROWS,
ErrorMetadata::pagination_limit(
"TooManyDocumentsRead",
format!(
"Too many documents read in a single function execution (limit: {}). \
{OVER_LIMIT_HELP}",
*TRANSACTION_MAX_READ_SIZE_ROWS,
)
),
);
anyhow::ensure!(
tx_size.total_document_size <= *TRANSACTION_MAX_READ_SIZE_BYTES,
ErrorMetadata::pagination_limit(
"TooManyBytesRead",
format!(
"Too many bytes read in a single function execution (limit: {} bytes). \
{OVER_LIMIT_HELP}",
*TRANSACTION_MAX_READ_SIZE_BYTES,
)
),
);
}
Ok(())
}
pub fn record_read_system_document(&mut self, document_size: usize) {
self.system_tx_size.total_document_count += 1;
self.system_tx_size.total_document_size += document_size;
}
pub fn record_indexed_directly(
&mut self,
index_name: TabletIndexName,
fields: IndexedFields,
interval: Interval,
) -> anyhow::Result<()> {
let _s = static_span!();
let (num_intervals_before, num_intervals_after) =
self._record_indexed(index_name, fields, [interval]);
self.num_intervals = self.num_intervals.saturating_sub(num_intervals_before);
self.num_intervals += num_intervals_after;
if self.num_intervals > *TRANSACTION_MAX_READ_SET_INTERVALS {
anyhow::bail!(
anyhow::anyhow!("top three: {}", self.top_three_intervals()).context(
ErrorMetadata::pagination_limit(
"TooManyReads",
format!(
"Too many reads in a single function execution (limit: {}). \
{OVER_LIMIT_HELP}",
*TRANSACTION_MAX_READ_SET_INTERVALS,
),
)
)
);
}
Ok(())
}
pub fn top_three_intervals(&self) -> String {
let mut intervals: Vec<_> = self
.read_set
.indexed
.iter()
.map(|(index, reads)| (reads.intervals.len(), index))
.collect();
intervals.sort_by_key(|(len, _)| *len);
let top_three = intervals
.iter()
.rev()
.take(3)
.map(|(amt, index)| format!("{index}: {amt}"))
.collect::<Vec<_>>();
top_three.join(",")
}
pub fn record_search(&mut self, index_name: TabletIndexName, search_reads: SearchQueryReads) {
self.read_set.search.mutate_entry_or_insert_with(
index_name,
SearchQueryReads::empty,
|existing_reads| existing_reads.merge(search_reads),
);
}
pub fn num_intervals(&self) -> usize {
self.num_intervals
}
pub fn user_tx_size(&self) -> &TransactionReadSize {
&self.user_tx_size
}
pub fn system_tx_size(&self) -> &TransactionReadSize {
&self.system_tx_size
}
}
#[cfg(any(test, feature = "testing"))]
impl proptest::arbitrary::Arbitrary for ReadSet {
type Parameters = ();
type Strategy = impl proptest::strategy::Strategy<Value = ReadSet>;
fn arbitrary_with((): Self::Parameters) -> Self::Strategy {
use proptest::prelude::*;
#[derive(Debug, proptest_derive::Arbitrary)]
struct GeneratedReads {
#[proptest(
strategy = "prop::collection::vec(any::<(TabletIndexName, IndexedFields, \
IntervalSet)>(), 0..4)"
)]
entries: Vec<(TabletIndexName, IndexedFields, IntervalSet)>,
#[proptest(strategy = "prop::collection::vec(any::<(TabletIndexName, \
SearchQueryReads)>(), 0..4)")]
search: Vec<(TabletIndexName, SearchQueryReads)>,
}
any::<GeneratedReads>().prop_map(|generated_reads| {
let indexed = generated_reads
.entries
.into_iter()
.map(|(index_name, fields, intervals)| {
(
index_name,
IndexReads {
fields,
intervals,
stack_traces: None,
},
)
})
.collect::<BTreeMap<_, _>>();
let search = generated_reads
.search
.into_iter()
.collect::<BTreeMap<_, _>>();
Self {
indexed: indexed.into(),
search: search.into(),
}
})
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use common::{
assert_obj,
document::{
CreationTime,
PackedDocument,
ResolvedDocument,
},
query::FilterValue,
testing::TestIdGenerator,
types::{
IndexDescriptor,
PersistenceVersion,
TabletIndexName,
},
value::{
ConvexValue,
FieldPath,
ResolvedDocumentId,
},
};
use search::{
query::{
FuzzyDistance,
TextQueryTerm,
},
FilterConditionRead,
QueryReads as SearchQueryReads,
TextQueryTermRead,
};
use value::val;
use super::TransactionReadSet;
use crate::ReadSet;
fn create_document_with_one_field(
id: ResolvedDocumentId,
field_name: &str,
value: ConvexValue,
) -> anyhow::Result<ResolvedDocument> {
ResolvedDocument::new(
id,
CreationTime::ONE,
assert_obj!(
field_name => value
),
)
}
fn create_document_with_extra_field(
id: ResolvedDocumentId,
field_name: &str,
value: ConvexValue,
) -> anyhow::Result<ResolvedDocument> {
ResolvedDocument::new(
id,
CreationTime::ONE,
assert_obj!(
field_name => value,
"extraField" => ConvexValue::String("word".to_string().try_into()?),
),
)
}
#[test]
fn search_fuzzy_text_no_prefix_0_distance_reads() -> anyhow::Result<()> {
let mut reads = TransactionReadSet::new();
let mut id_generator = TestIdGenerator::new();
let table_name = "mytable".parse()?;
let table_id = id_generator.user_table_id(&table_name);
let index_name =
TabletIndexName::new(table_id.tablet_id, IndexDescriptor::new("search_index")?)?;
let field_path = "textField";
let search_reads = SearchQueryReads::new(
vec![TextQueryTermRead {
field_path: FieldPath::from_str(field_path)?,
term: TextQueryTerm::Fuzzy {
max_distance: FuzzyDistance::Zero,
token: "word".to_string(),
prefix: false,
},
}]
.into(),
vec![].into(),
);
reads.record_search(index_name, search_reads);
let read_set = reads.into_read_set();
let id = id_generator.user_generate(&table_name);
assert!(read_set_overlaps(
id,
&read_set,
field_path,
// If "word" is a token, it overlaps.
"Text containing word and other stuff."
)?);
assert!(!read_set_overlaps(
id,
&read_set,
field_path,
// If "word" is just a substring, it does not overlap.
"This text doesn't have the keyword."
)?);
Ok(())
}
#[test]
fn search_fuzzy_text_no_prefix_1_distance_reads() -> anyhow::Result<()> {
let mut reads = TransactionReadSet::new();
let mut id_generator = TestIdGenerator::new();
let table_name = "mytable".parse()?;
let table_id = id_generator.user_table_id(&table_name);
let index_name =
TabletIndexName::new(table_id.tablet_id, IndexDescriptor::new("search_index")?)?;
let field_path = "textField";
let search_reads = SearchQueryReads::new(
vec![TextQueryTermRead {
field_path: FieldPath::from_str(field_path)?,
term: TextQueryTerm::Fuzzy {
max_distance: FuzzyDistance::One,
token: "wod".to_string(),
prefix: false,
},
}]
.into(),
vec![].into(),
);
reads.record_search(index_name, search_reads);
let read_set = reads.into_read_set();
let id = id_generator.user_generate(&table_name);
assert!(!read_set_overlaps(
id,
&read_set,
field_path,
// If "word" is just a substring, it does not overlap.
"This text doesn't have the keyword."
)?);
Ok(())
}
#[test]
fn search_fuzzy_text_no_prefix_2_distance_reads() -> anyhow::Result<()> {
let mut reads = TransactionReadSet::new();
let mut id_generator = TestIdGenerator::new();
let table_name = "mytable".parse()?;
let table_id = id_generator.user_table_id(&table_name);
let index_name =
TabletIndexName::new(table_id.tablet_id, IndexDescriptor::new("search_index")?)?;
let field_path = "textField";
let search_reads = SearchQueryReads::new(
vec![TextQueryTermRead {
field_path: FieldPath::from_str(field_path)?,
term: TextQueryTerm::Fuzzy {
max_distance: FuzzyDistance::Two,
token: "word".to_string(),
prefix: false,
},
}]
.into(),
vec![].into(),
);
reads.record_search(index_name, search_reads);
let read_set = reads.into_read_set();
let id = id_generator.user_generate(&table_name);
assert!(read_set_overlaps(
id,
&read_set,
field_path,
"Text containing word and other stuff."
)?);
assert!(!read_set_overlaps(
id,
&read_set,
field_path,
"This text doesn't have the keyword."
)?);
Ok(())
}
#[test]
fn search_fuzzy_text_prefix_0_distance_reads() -> anyhow::Result<()> {
let mut reads = TransactionReadSet::new();
let mut id_generator = TestIdGenerator::new();
let table_name = "mytable".parse()?;
let table_id = id_generator.user_table_id(&table_name);
let index_name =
TabletIndexName::new(table_id.tablet_id, IndexDescriptor::new("search_index")?)?;
let field_path = "textField";
let search_reads = SearchQueryReads::new(
vec![TextQueryTermRead {
field_path: FieldPath::from_str(field_path)?,
term: TextQueryTerm::Fuzzy {
max_distance: FuzzyDistance::Zero,
token: "word".to_string(),
prefix: true,
},
}]
.into(),
vec![].into(),
);
reads.record_search(index_name, search_reads);
let read_set = reads.into_read_set();
let id = id_generator.user_generate(&table_name);
assert!(read_set_overlaps(
id,
&read_set,
field_path,
// If "word.*" is a token, it overlaps.
"Text containing words and other stuff."
)?);
assert!(!read_set_overlaps(
id,
&read_set,
field_path,
// If "word.*" is just a substring, it does not overlap.
"This text doesn't have the keyword."
)?);
Ok(())
}
#[test]
fn search_fuzzy_text_prefix_1_distance_reads() -> anyhow::Result<()> {
let mut reads = TransactionReadSet::new();
let mut id_generator = TestIdGenerator::new();
let table_name = "mytable".parse()?;
let table_id = id_generator.user_table_id(&table_name);
let index_name =
TabletIndexName::new(table_id.tablet_id, IndexDescriptor::new("search_index")?)?;
let field_path = "textField";
let search_reads = SearchQueryReads::new(
vec![TextQueryTermRead {
field_path: FieldPath::from_str(field_path)?,
term: TextQueryTerm::Fuzzy {
max_distance: FuzzyDistance::One,
token: "wrd".to_string(),
prefix: true,
},
}]
.into(),
vec![].into(),
);
reads.record_search(index_name, search_reads);
let read_set = reads.into_read_set();
let id = id_generator.user_generate(&table_name);
assert!(read_set_overlaps(
id,
&read_set,
field_path,
// If "wrd.*" is a token, it overlaps.
"Text containing wrdsythings and other stuff."
)?);
assert!(!read_set_overlaps(
id,
&read_set,
field_path,
// If "word.*" is just a substring, it does not overlap.
"This text doesn't have keyword."
)?);
Ok(())
}
#[test]
fn search_fuzzy_text_prefix_2_distance_reads() -> anyhow::Result<()> {
let mut reads = TransactionReadSet::new();
let mut id_generator = TestIdGenerator::new();
let table_name = "mytable".parse()?;
let table_id = id_generator.user_table_id(&table_name);
let index_name =
TabletIndexName::new(table_id.tablet_id, IndexDescriptor::new("search_index")?)?;
let field_path = "textField";
let search_reads = SearchQueryReads::new(
vec![TextQueryTermRead {
field_path: FieldPath::from_str(field_path)?,
term: TextQueryTerm::Fuzzy {
max_distance: FuzzyDistance::Two,
token: "word".to_string(),
prefix: true,
},
}]
.into(),
vec![].into(),
);
reads.record_search(index_name, search_reads);
let read_set = reads.into_read_set();
let id = id_generator.user_generate(&table_name);
assert!(read_set_overlaps(
id,
&read_set,
field_path,
// If "word.*" is a token, it overlaps.
"Text containing wordsythings and other stuff."
)?);
// This would fail if prefix s false
assert!(read_set_overlaps(
id,
&read_set,
field_path,
"Text containing wordddd and other stuff."
)?);
assert!(!read_set_overlaps(
id,
&read_set,
field_path,
// If "word.*" is just a substring, it does not overlap.
"This text doesn't have keyword."
)?);
Ok(())
}
fn read_set_overlaps(
id: ResolvedDocumentId,
read_set: &ReadSet,
field_name: &str,
document_text: &str,
) -> anyhow::Result<bool> {
let doc_without_word = create_document_with_one_field(id, field_name, val!(document_text))?;
Ok(read_set
.overlaps_document_for_test(
&PackedDocument::pack(&doc_without_word),
PersistenceVersion::default(),
)
.is_some())
}
#[test]
fn test_search_exact_text_reads() -> anyhow::Result<()> {
let mut reads = TransactionReadSet::new();
let mut id_generator = TestIdGenerator::new();
let table_name = "mytable".parse()?;
let table_id = id_generator.user_table_id(&table_name);
let index_name =
TabletIndexName::new(table_id.tablet_id, IndexDescriptor::new("search_index")?)?;
let search_reads = SearchQueryReads::new(
vec![TextQueryTermRead {
field_path: FieldPath::from_str("textField")?,
term: TextQueryTerm::Exact("word".to_string()),
}]
.into(),
vec![].into(),
);
reads.record_search(index_name.clone(), search_reads);
let read_set = reads.into_read_set();
// If "word" is a token, it overlaps.
let doc_with_word = create_document_with_one_field(
id_generator.user_generate(&table_name),
"textField",
val!("Text containing word and other stuff."),
)?;
assert_eq!(
read_set
.overlaps_document_for_test(
&PackedDocument::pack(&doc_with_word),
PersistenceVersion::default()
)
.unwrap()
.index,
index_name
);
// If "word" is just a substring, it does not.
let doc_without_word = create_document_with_one_field(
id_generator.user_generate(&table_name),
"textField",
val!("This text doesn't have the keyword."),
)?;
assert_eq!(
read_set.overlaps_document_for_test(
&PackedDocument::pack(&doc_without_word),
PersistenceVersion::default()
),
None
);
Ok(())
}
#[test]
fn test_search_filter_reads_empty_query() -> anyhow::Result<()> {
let mut reads = TransactionReadSet::new();
let mut id_generator = TestIdGenerator::new();
let table_name = "mytable".parse()?;
let table_id = id_generator.user_table_id(&table_name);
let index_name =
TabletIndexName::new(table_id.tablet_id, IndexDescriptor::new("search_index")?)?;
let search_reads = SearchQueryReads::new(
vec![].into(),
vec![FilterConditionRead::Must(
FieldPath::from_str("nullField")?,
FilterValue::from_search_value(Some(&ConvexValue::Null)),
)]
.into(),
);
reads.record_search(index_name.clone(), search_reads);
let read_set = reads.into_read_set();
// If "nullField" is Null, it overlaps.
let doc_with_explicit_null = create_document_with_one_field(
id_generator.user_generate(&table_name),
"nullField",
ConvexValue::Null,
)?;
assert_eq!(
read_set
.overlaps_document_for_test(
&PackedDocument::pack(&doc_with_explicit_null),
PersistenceVersion::default()
)
.unwrap()
.index,
index_name
);
// If "nullField" is not present, it does not overlap.
let doc_with_missing_field = create_document_with_one_field(
id_generator.user_generate(&table_name),
"unrelatedField",
ConvexValue::Null,
)?;
assert_eq!(
read_set.overlaps_document_for_test(
&PackedDocument::pack(&doc_with_missing_field),
PersistenceVersion::default()
),
None
);
// If "nullField" is a different type, it does not overlap.
let doc_with_implicit_null = create_document_with_one_field(
id_generator.user_generate(&table_name),
"nullField",
ConvexValue::Int64(123),
)?;
assert_eq!(
read_set.overlaps_document_for_test(
&PackedDocument::pack(&doc_with_implicit_null),
PersistenceVersion::default()
),
None
);
Ok(())
}
#[test]
fn test_search_filter_reads() -> anyhow::Result<()> {
let mut reads = TransactionReadSet::new();
let mut id_generator = TestIdGenerator::new();
let table_name = "mytable".parse()?;
let table_id = id_generator.user_table_id(&table_name);
let index_name =
TabletIndexName::new(table_id.tablet_id, IndexDescriptor::new("search_index")?)?;
let search_reads = SearchQueryReads::new(
vec![TextQueryTermRead {
field_path: FieldPath::from_str("extraField")?,
term: TextQueryTerm::Fuzzy {
max_distance: FuzzyDistance::Zero,
token: "word".to_string(),
prefix: false,
},
}]
.into(),
vec![FilterConditionRead::Must(
FieldPath::from_str("nullField")?,
FilterValue::from_search_value(Some(&ConvexValue::Null)),
)]
.into(),
);
reads.record_search(index_name.clone(), search_reads);
let read_set = reads.into_read_set();
// If "nullField" is Null, it overlaps.
let doc_with_explicit_null = create_document_with_extra_field(
id_generator.user_generate(&table_name),
"nullField",
ConvexValue::Null,
)?;
assert_eq!(
read_set
.overlaps_document_for_test(
&PackedDocument::pack(&doc_with_explicit_null),
PersistenceVersion::default()
)
.unwrap()
.index,
index_name
);
// If "nullField" is not present, it does not overlap.
let doc_with_missing_field = create_document_with_extra_field(
id_generator.user_generate(&table_name),
"unrelatedField",
ConvexValue::Null,
)?;
assert_eq!(
read_set.overlaps_document_for_test(
&PackedDocument::pack(&doc_with_missing_field),
PersistenceVersion::default()
),
None
);
// If "nullField" is a different type, it does not overlap.
let doc_with_implicit_null = create_document_with_extra_field(
id_generator.user_generate(&table_name),
"nullField",
ConvexValue::Int64(123),
)?;
assert_eq!(
read_set.overlaps_document_for_test(
&PackedDocument::pack(&doc_with_implicit_null),
PersistenceVersion::default()
),
None
);
Ok(())
}
}