Skip to main content
Glama

Convex MCP server

Official
by get-convex
table_iteration.rs46.4 kB
use std::{ cmp, collections::{ BTreeMap, BTreeSet, }, ops::Deref, sync::Arc, }; use anyhow::Context; use common::{ bootstrap_model::index::database_index::IndexedFields, document::ResolvedDocument, index::{ IndexKey, IndexKeyBytes, }, interval::Interval, knobs::DOCUMENTS_IN_MEMORY, persistence::{ new_static_repeatable_recent, DocumentLogEntry, DocumentPrevTsQuery, LatestDocument, PersistenceReader, RepeatablePersistence, RetentionValidator, TimestampRange, }, query::{ CursorPosition, Order, }, runtime::Runtime, try_chunks::TryChunksExt, types::{ IndexId, RepeatableTimestamp, Timestamp, }, value::ResolvedDocumentId, }; use futures::{ pin_mut, stream, Stream, StreamExt, TryStreamExt, }; use futures_async_stream::try_stream; use value::{ InternalDocumentId, InternalId, TabletId, }; /// A cursor for use while scanning a table by ID. /// /// The key is the last element processed thus far. #[derive(Clone, Debug)] pub struct TableScanCursor { pub index_key: Option<CursorPosition>, } impl TableScanCursor { pub fn interval(&self) -> Interval { match &self.index_key { Some(index_key) => { let (_, remaining) = Interval::all().split(index_key.clone(), Order::Asc); remaining }, None => Interval::all(), } } pub fn advance(&mut self, index_key: CursorPosition) -> anyhow::Result<()> { if let Some(existing_key) = &self.index_key { anyhow::ensure!(index_key > existing_key); } self.index_key = Some(index_key); Ok(()) } } fn cursor_has_walked(cursor: Option<&CursorPosition>, key: &IndexKeyBytes) -> bool { match cursor { None => false, Some(CursorPosition::End) => true, Some(CursorPosition::After(cursor)) => key <= cursor, } } pub struct TableIterator<RT: Runtime> { inner: TableIteratorInner<RT>, } impl<RT: Runtime> TableIterator<RT> { pub fn new( runtime: RT, snapshot_ts: RepeatableTimestamp, persistence: Arc<dyn PersistenceReader>, retention_validator: Arc<dyn RetentionValidator>, page_size: usize, ) -> Self { Self { inner: TableIteratorInner { runtime, persistence, retention_validator, page_size, snapshot_ts, }, } } pub fn with_page_size(mut self, page_size: usize) -> Self { self.inner.page_size = page_size; self } /// Create a `MultiTableIterator`, which can iterate multiple tables at the /// same snapshot timestamp. This is more efficient than creating a separate /// `TableIterator` for each table since each table can share the work of /// iterating the document log. /// /// The iterator will only be able to visit those tables passed to `multi`. /// Trying to visit a table that wasn't initially specified will error. /// /// The iterator keeps some state in memory for each of the `tables` /// provided. To reduce memory usage, you can call /// [`MultiTableIterator::unregister_table`] if you know that a given table /// will not be iterated again. /// /// Example: /// ```no_run /// # async fn iterate_example<RT: common::runtime::Runtime>( /// # db: &database::Database<RT>, /// # [table1, table2, table3]: [value::TabletId; 3], /// # by_id: common::types::IndexId, /// # ts: common::types::RepeatableTimestamp, /// # ) -> anyhow::Result<()> { /// # use futures::stream::TryStreamExt; /// # use std::pin::pin; /// let tables = vec![table1, table2, table3]; /// let page_size = 100; /// let mut iterator = db.table_iterator(ts, page_size).multi(tables.clone()); /// for tablet_id in tables { /// iterator.stream_documents_in_table(tablet_id, by_id, None).try_for_each(async |doc| { /// // handle doc /// Ok(()) /// }).await?; /// iterator.unregister_table(tablet_id); /// // afterward, `iterator` can no longer visit `tablet_id` /// } /// # Ok(()) /// # } /// ``` pub fn multi(self, tables: Vec<TabletId>) -> MultiTableIterator<RT> { MultiTableIterator { end_ts: self.inner.snapshot_ts, buffered_documents: tables .into_iter() .map(|tablet_id| (tablet_id, BTreeMap::new())) .collect(), inner: self.inner, } } #[try_stream(ok = LatestDocument, error = anyhow::Error)] pub async fn stream_documents_in_table( self, tablet_id: TabletId, by_id: IndexId, cursor: Option<ResolvedDocumentId>, ) { let mut iterator = self.multi(vec![]); let stream = iterator.stream_documents_in_table(tablet_id, by_id, cursor); pin_mut!(stream); while let Some(rev) = stream.try_next().await? { yield rev; } } #[try_stream(ok = (IndexKeyBytes, LatestDocument), error = anyhow::Error)] pub async fn stream_documents_in_table_by_index( self, tablet_id: TabletId, index_id: IndexId, indexed_fields: IndexedFields, cursor: Option<CursorPosition>, ) { let mut iterator = self.multi(vec![]); let stream = iterator.stream_documents_in_table_by_index( tablet_id, index_id, indexed_fields, cursor, ); pin_mut!(stream); while let Some(rev) = stream.try_next().await? { yield rev; } } } struct TableIteratorInner<RT: Runtime> { runtime: RT, persistence: Arc<dyn PersistenceReader>, retention_validator: Arc<dyn RetentionValidator>, page_size: usize, snapshot_ts: RepeatableTimestamp, } pub struct MultiTableIterator<RT: Runtime> { inner: TableIteratorInner<RT>, end_ts: RepeatableTimestamp, // Buffered document metadata between `snapshot_ts` and `end_ts`. // This is useful because there is no way to read the document log for just // a single table. buffered_documents: BufferedDocumentMetadata, } // For each table that we are interested in, stores a map containing every // observed id in that table and its first prev_ts (or None if the first row had // no prev_ts) type BufferedDocumentMetadata = BTreeMap<TabletId, BTreeMap<InternalId, Option<PrevTs>>>; #[derive(Copy, Clone, Debug)] struct PrevTs { prev_ts: Timestamp, } impl<RT: Runtime> MultiTableIterator<RT> { /// Signal that the given `tablet_id` will not be iterated in the future. /// The `tablet_id` must have been provided during this iterator's /// construction. /// /// Calling this is an optimization to reduce memory usage by dropping the /// list of changed documents. pub fn unregister_table(&mut self, tablet_id: TabletId) -> anyhow::Result<()> { self.buffered_documents .remove(&tablet_id) .context("unregister_table on an unknown table")?; Ok(()) } #[try_stream(ok = LatestDocument, error = anyhow::Error)] pub async fn stream_documents_in_table( &mut self, tablet_id: TabletId, by_id: IndexId, cursor: Option<ResolvedDocumentId>, ) { let stream = self.stream_documents_in_table_by_index( tablet_id, by_id, IndexedFields::by_id(), cursor.map(|id| CursorPosition::After(IndexKey::new(vec![], id.into()).to_bytes())), ); pin_mut!(stream); while let Some((_, rev)) = stream.try_next().await? { yield rev; } } /// Algorithm overview: /// Walk a table ordered by an index at snapshot_ts which may be outside /// retention, so you can't walk the index directly. /// /// 1. Walk the index at recent, increasing snapshots, and /// collect documents that have not changed since snapshot_ts. /// 2. Also walk the document log between snapshot_ts and the increasing /// snapshots, and for each changed document look at its revision as of /// snapshot_ts. /// 3. Merge the results. /// /// Why it works: /// Consider a document that exists in the index at snapshot_ts. /// Either it has changed since snapshot_ts, in which case (2) will find /// it, or it has not, in which case (1) will find it. /// /// (2) is implemented by recording the *first* prev_ts (if any) for each /// document id encountered in the document log. That prev_ts points to the /// document that belongs to the snapshot (or, if null, indicates that the /// document is new and doesn't belong in the snapshot). /// /// The document log can't be filtered by table, so `MultiTableIterator` /// additionally remembers this metadata for every table that the caller is /// interested in. This allows each subsequent table iteration to continue /// where the previous walk left off. #[try_stream(ok = (IndexKeyBytes, LatestDocument), error = anyhow::Error)] pub async fn stream_documents_in_table_by_index( &mut self, tablet_id: TabletId, index_id: IndexId, indexed_fields: IndexedFields, cursor: Option<CursorPosition>, ) { let mut cursor = TableScanCursor { index_key: cursor }; // skipped_keys are keys of documents that were modified after // snapshot_ts but before the index key was walked over. // Such documents must be // (a) modified/deleted after snapshot_ts but before new_end_ts. // (b) have key > cursor. // We insert skipped documents into future pages of the index walk when we get // to them. let mut skipped_keys = IterationDocuments::default(); // observed_ids is the set of document IDs in the table between // `(snapshot_ts, end_ts]`, and also corresponds to all the documents // that may have been inserted into `skipped_keys` (if present in the // snapshot & not already walked) let mut observed_ids: BTreeSet<InternalId> = BTreeSet::new(); if let Some(buffered_documents) = self.buffered_documents.get(&tablet_id) { // We've already walked the document log and stored some document // timestamps for this table. Load those documents and prepopulate // `skipped_keys`. observed_ids.extend(buffered_documents.keys().copied()); let mut revisions_at_snapshot = self .inner .load_revisions_at_snapshot_ts(stream::iter(buffered_documents.iter().filter_map( |(&id, &timestamps)| { timestamps.map(|ts| Ok((InternalDocumentId::new(tablet_id, id), ts))) }, ))) .boxed(); // `boxed()` instead of `pin_mut!` works around https://github.com/rust-lang/rust/issues/96865 let persistence_version = self.inner.persistence.version(); while let Some(rev) = revisions_at_snapshot.try_next().await? { let index_key = rev .value .index_key(&indexed_fields, persistence_version) .to_bytes(); skipped_keys.insert(index_key, rev.ts, rev.value, rev.prev_ts); } } else { // As a special case, the very first table visited by a // `MultiTableIterator` is allowed to be any table, even if it // wasn't specified. This is just to support the single-table // iteration methods on `TableIterator`. anyhow::ensure!( self.inner.snapshot_ts == self.end_ts, "this MultiTableIterator has already advanced from {snapshot_ts} to {end_ts}, but \ it has no buffered documents for table {tablet_id}", snapshot_ts = self.inner.snapshot_ts, end_ts = self.end_ts ); } loop { let pause_client = self.inner.runtime.pause_client(); pause_client.wait("before_index_page").await; let page_start = cursor.index_key.clone(); let (page, new_end_ts) = self .inner .fetch_page(index_id, tablet_id, &mut cursor) .await?; anyhow::ensure!(*new_end_ts >= self.end_ts); let page_end = cursor .index_key .as_ref() .context("cursor after page should not be empty")?; // Filter out rows from the index scan that were modified after // snapshot_ts. Such documents will be found when walking the // documents log to generate skipped_keys. let page: BTreeMap<_, _> = page .into_iter() .filter(|(_, rev)| rev.ts <= *self.inner.snapshot_ts) .map(|(index_key, LatestDocument { ts, value, prev_ts })| { (index_key, (ts, IterationDocument::Full { value, prev_ts })) }) .collect(); // 2. Find any keys for documents that were skipped by this // page or will be skipped by future pages. // These documents are returned with index keys and revisions as // they existed at snapshot_ts. self.inner .fetch_skipped_keys( tablet_id, &indexed_fields, page_start.as_ref(), *self.end_ts, new_end_ts, &mut skipped_keys, &mut observed_ids, &mut self.buffered_documents, ) .await?; if let Some((first_skipped_key, _)) = skipped_keys.iter().next() { // Check all skipped ids are after the old cursor, // which ensures the yielded output is in index key order. anyhow::ensure!(!cursor_has_walked(page_start.as_ref(), first_skipped_key)); } self.end_ts = new_end_ts; // Extract the documents from skipped_keys that should be returned in // the current page. let page_skipped_keys = { let mut page_skipped_keys = BTreeMap::new(); while let Some(first_skipped_key) = skipped_keys.keys().next() && cursor_has_walked(Some(page_end), first_skipped_key) { let (key, value) = skipped_keys .remove(&first_skipped_key.clone()) .context("skipped_keys should be nonempty")?; page_skipped_keys.insert(key, value); } page_skipped_keys }; // Merge index walk and skipped keys into BTreeMap, which sorts by index key. let merged_page = IterationDocuments::new(page.into_iter().chain(page_skipped_keys).collect()); // Sanity check output. let all_ids: BTreeSet<_> = merged_page .values() .map(|(_, doc)| doc.internal_id()) .collect(); anyhow::ensure!( all_ids.len() == merged_page.len(), "duplicate id in table iterator {merged_page:?}" ); anyhow::ensure!( merged_page .values() .all(|(ts, _)| *ts <= *self.inner.snapshot_ts), "document after snapshot in table iterator {merged_page:?}" ); anyhow::ensure!( merged_page.keys().all(|key| { !cursor_has_walked(page_start.as_ref(), key) && cursor_has_walked(Some(page_end), key) }), "document outside page in table iterator {merged_page:?}" ); let mut merged_page_docs = self.inner.reload_revisions_at_snapshot_ts(merged_page); while let Some((key, rev)) = merged_page_docs.try_next().await? { // The caller will likely consume the documents in a CPU-intensive loop, // and `merged_page_docs.try_next().await` will often be Ready // immediately, so it won't yield. // Make sure we yield to not starve other tokio tasks. tokio::task::consume_budget().await; yield (key, rev); } if matches!(page_end, CursorPosition::End) { // If we are done, all skipped_keys would be put in this final page. anyhow::ensure!(skipped_keys.is_empty()); break; } } } } impl<RT: Runtime> TableIteratorInner<RT> { /// A document may be skipped if: /// 1. it is in the correct table /// 2. at the snapshot, it had a key higher than what we've walked so far /// 3. it was modified after the snapshot but before we walked its key /// range. #[fastrace::trace] async fn fetch_skipped_keys( &self, tablet_id: TabletId, indexed_fields: &IndexedFields, lower_bound: Option<&CursorPosition>, start_ts: Timestamp, end_ts: RepeatableTimestamp, output: &mut IterationDocuments, observed_ids: &mut BTreeSet<InternalId>, buffered_documents: &mut BufferedDocumentMetadata, ) -> anyhow::Result<()> { let reader = self.persistence.clone(); let persistence_version = reader.version(); let skipped_revs = self.walk_document_log( tablet_id, start_ts, end_ts, observed_ids, buffered_documents, ); pin_mut!(skipped_revs); let revisions_at_snapshot = self.load_revisions_at_snapshot_ts(skipped_revs); pin_mut!(revisions_at_snapshot); while let Some(rev) = revisions_at_snapshot.try_next().await? { let index_key = rev .value .index_key(indexed_fields, persistence_version) .to_bytes(); if !cursor_has_walked(lower_bound, &index_key) { output.insert(index_key, rev.ts, rev.value, rev.prev_ts); } } Ok(()) } #[try_stream(ok = (InternalDocumentId, PrevTs), error = anyhow::Error)] async fn walk_document_log<'a>( &'a self, tablet_id: TabletId, start_ts: Timestamp, end_ts: RepeatableTimestamp, observed_ids: &'a mut BTreeSet<InternalId>, buffered_documents: &'a mut BufferedDocumentMetadata, ) { let reader = self.persistence.clone(); let repeatable_persistence = RepeatablePersistence::new(reader, end_ts, self.retention_validator.clone()); // TODO: don't fetch document contents from the database let documents = repeatable_persistence .load_documents(TimestampRange::new(start_ts.succ()?..=*end_ts), Order::Asc); pin_mut!(documents); while let Some(entry) = documents.try_next().await? { if let Some(buffer) = buffered_documents.get_mut(&entry.id.table()) { // Don't overwrite any existing entry at `id` buffer .entry(entry.id.internal_id()) .or_insert(entry.prev_ts.map(|prev_ts| PrevTs { prev_ts })); } if entry.id.table() == tablet_id { // only yield if this is the first time we have seen this ID if observed_ids.insert(entry.id.internal_id()) { // If prev_ts is None, we still add to `observed_ids` to // ignore future log entries with this `id` if let Some(prev_ts) = entry.prev_ts { yield (entry.id, PrevTs { prev_ts }); } } } } } /// We have these constraints: /// /// 1. we need each walk to be >= snapshot_ts /// 2. we need each successive walk to be >= the previous walk /// 3. we need each walk to be repeatable /// 4. we need each walk to be within retention /// /// We can satisfy these constraints by always walking at max(snapshot_ts, /// new_static_repeatable_recent()). /// /// 1. max(snapshot_ts, anything) >= snapshot_ts /// 2. snapshot_ts never changes and new_static_repeatable_recent is weakly /// monotonically increasing /// 3. snapshot_ts and new_static_repeatable_recent are both Repeatable, and /// the max of Repeatable timestamps is repeatable. /// 4. new_static_repeatable_recent is within retention, so max(anything, /// new_static_repeatable_recent()) is within retention. async fn new_ts(&self) -> anyhow::Result<RepeatableTimestamp> { Ok(cmp::max( self.snapshot_ts, new_static_repeatable_recent(self.persistence.as_ref()).await?, )) } #[fastrace::trace] async fn fetch_page( &self, index_id: IndexId, tablet_id: TabletId, cursor: &mut TableScanCursor, ) -> anyhow::Result<(Vec<(IndexKeyBytes, LatestDocument)>, RepeatableTimestamp)> { let ts = self.new_ts().await?; let repeatable_persistence = RepeatablePersistence::new( self.persistence.clone(), ts, self.retention_validator.clone(), ); let reader = repeatable_persistence.read_snapshot(ts)?; let stream = reader.index_scan( index_id, tablet_id, &cursor.interval(), Order::Asc, self.page_size, ); let documents_in_page: Vec<_> = stream.take(self.page_size).try_collect().await?; if documents_in_page.len() < self.page_size { cursor.advance(CursorPosition::End)?; } else if let Some((index_key, ..)) = documents_in_page.last() { cursor.advance(CursorPosition::After(index_key.clone()))?; } Ok((documents_in_page, ts)) } /// Load the revisions of documents visible at `self.snapshot_ts`. /// Documents are yielded in the same order as input, skipping duplicates /// and documents that didn't exist at the snapshot. #[try_stream(ok = LatestDocument, error = anyhow::Error)] async fn load_revisions_at_snapshot_ts<'a>( &'a self, ids: impl Stream<Item = anyhow::Result<(InternalDocumentId, PrevTs)>> + 'a, ) { let repeatable_persistence = RepeatablePersistence::new( self.persistence.clone(), self.snapshot_ts, self.retention_validator.clone(), ); // Note even though `previous_revisions` can paginate internally, we don't want // to hold the entire result set in memory, because documents can be large. let id_chunks = ids.try_chunks2(self.page_size); pin_mut!(id_chunks); let snapshot_ts_succ = self.snapshot_ts.succ()?; let make_query = |&(id, PrevTs { prev_ts }): &(InternalDocumentId, PrevTs)| DocumentPrevTsQuery { id, // HAX: we do not remember the `ts` of the original row that this // query came from. However, `snapshot_ts_succ` is correct for the // purposes of the retention validator since `prev_ts` was latest as of // `snapshot_ts`. ts: snapshot_ts_succ, prev_ts, }; while let Some(chunk) = id_chunks.try_next().await? { for q in &chunk { anyhow::ensure!( snapshot_ts_succ > q.1.prev_ts, "Querying a prev_ts {prev_ts} that does not lie within the snapshot \ {snapshot_ts}", prev_ts = q.1.prev_ts, snapshot_ts = self.snapshot_ts ); } let ids_to_load = chunk.iter().map(make_query).collect(); let mut old_revisions = repeatable_persistence .previous_revisions_of_documents(ids_to_load) .await?; // Yield in the same order as the input for q in chunk { let DocumentLogEntry { ts, value, prev_ts, .. } = old_revisions .remove(&make_query(&q)) .with_context(|| format!("Missing revision at snapshot: {q:?}"))?; let Some(value) = value else { continue }; yield LatestDocument { ts, value, prev_ts }; } anyhow::ensure!( old_revisions.is_empty(), "logic error: unfetched results remain in old_revisions" ); } } #[try_stream(boxed, ok = (IndexKeyBytes, LatestDocument), error = anyhow::Error)] async fn load_index_entries_at_snapshot_ts( &self, entries: Vec<(InternalDocumentId, Timestamp, IndexKeyBytes)>, ) { let ids: Vec<_> = entries .iter() .map(|&(id, ts, _)| (id, PrevTs { prev_ts: ts })) .collect(); let mut key_by_id: BTreeMap<_, _> = entries.into_iter().map(|(id, _, key)| (id, key)).collect(); let revisions = self.load_revisions_at_snapshot_ts(stream::iter(ids.into_iter().map(Ok))); pin_mut!(revisions); while let Some(rev) = revisions.try_next().await? { let key = key_by_id .remove(&rev.value.id_with_table_id()) .context("key_by_id missing")?; yield (key, rev); } } /// Like `load_revisions_at_snapshot_ts` but doesn't need to fetch /// if the IterationDocument has the Full document. #[try_stream(boxed, ok = (IndexKeyBytes, LatestDocument), error = anyhow::Error)] async fn reload_revisions_at_snapshot_ts(&self, documents: IterationDocuments) { let mut current_batch = Vec::new(); for (key, (ts, doc)) in documents.into_iter() { match doc { IterationDocument::Full { value, prev_ts } => { let mut flush = self.load_index_entries_at_snapshot_ts(current_batch); while let Some((key, rev)) = flush.try_next().await? { yield (key, rev); } current_batch = Vec::new(); yield (key, LatestDocument { ts, value, prev_ts }); }, IterationDocument::Id(id) => { current_batch.push((id, ts, key)); }, } } let mut flush = self.load_index_entries_at_snapshot_ts(current_batch); while let Some((key, rev)) = flush.try_next().await? { yield (key, rev); } } } #[derive(Debug)] enum IterationDocument { Full { value: ResolvedDocument, prev_ts: Option<Timestamp>, }, Id(InternalDocumentId), } impl IterationDocument { fn internal_id(&self) -> InternalId { match self { Self::Full { value, .. } => value.internal_id(), Self::Id(id) => id.internal_id(), } } } /// To avoid storing too many documents in memory, we evict the document values, /// leaving only the IDs. #[derive(Default, Debug)] struct IterationDocuments { count_full: usize, docs: BTreeMap<IndexKeyBytes, (Timestamp, IterationDocument)>, } impl IterationDocuments { fn new(docs: BTreeMap<IndexKeyBytes, (Timestamp, IterationDocument)>) -> Self { Self { count_full: docs .values() .filter(|(_, doc)| matches!(doc, IterationDocument::Full { .. })) .count(), docs, } } fn insert( &mut self, index_key: IndexKeyBytes, ts: Timestamp, doc: ResolvedDocument, prev_ts: Option<Timestamp>, ) { if self.count_full < *DOCUMENTS_IN_MEMORY { self.docs.insert( index_key, ( ts, IterationDocument::Full { value: doc, prev_ts, }, ), ); self.count_full += 1; } else { self.docs.insert( index_key, (ts, IterationDocument::Id(doc.id_with_table_id())), ); } } fn remove( &mut self, index_key: &IndexKeyBytes, ) -> Option<(IndexKeyBytes, (Timestamp, IterationDocument))> { let removed = self.docs.remove_entry(index_key); if let Some((_, (_, IterationDocument::Full { .. }))) = &removed { self.count_full -= 1; } removed } } impl IntoIterator for IterationDocuments { type IntoIter = <BTreeMap<IndexKeyBytes, (Timestamp, IterationDocument)> as IntoIterator>::IntoIter; type Item = (IndexKeyBytes, (Timestamp, IterationDocument)); fn into_iter(self) -> Self::IntoIter { self.docs.into_iter() } } impl Deref for IterationDocuments { type Target = BTreeMap<IndexKeyBytes, (Timestamp, IterationDocument)>; fn deref(&self) -> &Self::Target { &self.docs } } #[cfg(test)] mod tests { use std::collections::{ BTreeMap, BTreeSet, }; use cmd_util::env::env_config; use common::{ bootstrap_model::index::{ database_index::IndexedFields, IndexMetadata, }, pause::PauseController, runtime::Runtime, types::{ unchecked_repeatable_ts, GenericIndexName, IndexDescriptor, IndexName, }, value::{ ConvexObject, FieldName, FieldPath, TableName, }, }; use futures::TryStreamExt; use keybroker::Identity; use prop::collection::vec as prop_vec; use proptest::prelude::*; use runtime::testing::{ TestDriver, TestRuntime, }; use tokio::sync::oneshot; use value::{ assert_obj, assert_val, proptest::{ RestrictNaNs, ValueBranching, }, resolved_object_strategy, resolved_value_strategy, ExcludeSetsAndMaps, InternalId, TableNamespace, }; use crate::{ test_helpers::{ new_test_database, DbFixtures, }, IndexModel, TestFacingModel, Transaction, UserFacingModel, }; fn small_user_object() -> impl Strategy<Value = ConvexObject> { let values = resolved_value_strategy( FieldName::user_strategy, ValueBranching::small(), ExcludeSetsAndMaps(false), RestrictNaNs(false), ); resolved_object_strategy(FieldName::user_strategy(), values, 0..4) } #[derive(Debug, proptest_derive::Arbitrary)] enum Update { Insert { #[proptest(strategy = "small_user_object()")] object: ConvexObject, }, Replace { index: usize, #[proptest(strategy = "small_user_object()")] object: ConvexObject, }, Delete { index: usize, }, } fn racing_updates() -> impl Strategy<Value = Vec<Vec<Update>>> { prop_vec(prop_vec(any::<Update>(), 0..4), 0..4) } fn small_user_objects() -> impl Strategy<Value = Vec<ConvexObject>> { prop_vec(small_user_object(), 1..8) } fn by_id_index<RT: Runtime>( tx: &mut Transaction<RT>, table_name: &TableName, ) -> anyhow::Result<InternalId> { let by_id = IndexName::by_id(table_name.clone()); let by_id_metadata = IndexModel::new(tx) .enabled_index_metadata(TableNamespace::test_user(), &by_id)? .unwrap(); Ok(by_id_metadata.id().internal_id()) } fn iterator_includes_all_documents_test(table_name: TableName, objects: Vec<ConvexObject>) { let td = TestDriver::new(); let runtime = td.rt(); let test = async { let database = new_test_database(runtime.clone()).await; let mut expected = BTreeSet::new(); let mut tx = database.begin(Identity::system()).await?; for object in objects { let id = TestFacingModel::new(&mut tx) .insert(&table_name, object) .await?; expected.insert(id.internal_id()); } let table_mapping = tx.table_mapping().namespace(TableNamespace::test_user()); let by_id = by_id_index(&mut tx, &table_name)?; database.commit(tx).await?; let iterator = database.table_iterator(database.now_ts_for_reads(), 2); let tablet_id = table_mapping.id(&table_name)?.tablet_id; let revision_stream = iterator.stream_documents_in_table(tablet_id, by_id, None); futures::pin_mut!(revision_stream); let mut actual = BTreeSet::new(); while let Some(revision) = revision_stream.try_next().await? { actual.insert(revision.value.id().internal_id()); } assert_eq!(expected, actual); Ok::<_, anyhow::Error>(()) }; td.run_until(test).unwrap(); } async fn racing_commits_test( runtime: TestRuntime, table_name: TableName, initial: Vec<ConvexObject>, update_batches: Vec<Vec<Update>>, pause: PauseController, ) -> anyhow::Result<()> { let database = new_test_database(runtime.clone()).await; let mut objects = BTreeMap::new(); let mut tx = database.begin(Identity::system()).await?; for object in initial { let inserted_id = TestFacingModel::new(&mut tx) .insert(&table_name, object) .await?; let inserted = tx.get(inserted_id).await?.unwrap(); objects.insert(inserted_id, inserted.to_developer()); } // We expect the iterator to produce the initial objects. let expected = objects.clone(); let table_mapping = tx.table_mapping().namespace(TableNamespace::test_user()); let by_id = by_id_index(&mut tx, &table_name)?; database.commit(tx).await?; let hold_guard = pause.hold("before_index_page"); let snapshot_ts = database.now_ts_for_reads(); let iterator = database.table_iterator(snapshot_ts, 2); let tablet_id = table_mapping.id(&table_name)?.tablet_id; let revision_stream = iterator.stream_documents_in_table(tablet_id, by_id, None); let table_name_ = table_name.clone(); let database_ = database.clone(); let (stream_done_tx, mut stream_done_rx) = oneshot::channel(); let test_driver = async move { let mut hold_guard = hold_guard; for update_batch in update_batches { // Run the backfill process until it hits our breakpoint. let pause_guard = tokio::select! { _ = &mut stream_done_rx => break, pause_guard = hold_guard.wait_for_blocked() => match pause_guard { Some(pause_guard) => pause_guard, // If the worker has finished processing index pages, stop agitating. None => break, }, }; // Agitate by doing a concurrent update while the worker is blocked. for update in update_batch { let mut tx = database_.begin(Identity::system()).await?; match update { Update::Insert { object } => { let inserted_id = TestFacingModel::new(&mut tx) .insert(&table_name_, object) .await?; let inserted = tx.get(inserted_id).await?.unwrap(); objects.insert(inserted_id, inserted.to_developer()); }, Update::Replace { index, object } => { if objects.is_empty() { continue; } let id = *(objects.keys().nth(index % objects.len()).unwrap()); let replaced = UserFacingModel::new_root_for_test(&mut tx) .replace(id.into(), object) .await?; objects.insert(id, replaced); }, Update::Delete { index } => { if objects.is_empty() { continue; } let id = *(objects.keys().nth(index % objects.len()).unwrap()); UserFacingModel::new_root_for_test(&mut tx) .delete(id.into()) .await?; objects.remove(&id).unwrap(); }, } database_.commit(tx).await?; // TableIterator walks by_id at pages with timestamp // max(snapshot_ts, max_repeatable_ts), so we bump max_repeatable_ts // to make the commit visible to TableIterator. database_.bump_max_repeatable_ts().await?; } hold_guard = pause.hold("before_index_page"); // Continue the worker. pause_guard.unpause(); } anyhow::Ok(()) }; let documents_from_iterator = async move { futures::pin_mut!(revision_stream); let mut actual = BTreeMap::new(); let mut prev_doc_id = None; while let Some(rev) = revision_stream.try_next().await? { assert!(rev.ts <= *snapshot_ts); assert!(prev_doc_id < Some(rev.value.id())); prev_doc_id = Some(rev.value.id()); actual.insert(rev.value.id(), rev.value.to_developer()); } let _ = stream_done_tx.send(()); Ok(actual) }; let result_future = async move { futures::try_join!(documents_from_iterator, test_driver) }; let (computed, _) = result_future.await?; assert_eq!(expected, computed); anyhow::Ok(()) } #[convex_macro::test_runtime] async fn test_deleted(rt: TestRuntime, pause: PauseController) -> anyhow::Result<()> { racing_commits_test( rt, "A".parse()?, vec![assert_obj!()], vec![vec![Update::Delete { index: 0 }]], pause, ) .await } #[convex_macro::test_runtime] async fn test_index_key_change(rt: TestRuntime) -> anyhow::Result<()> { let DbFixtures { db: database, .. } = DbFixtures::new(&rt).await?; let table_name: TableName = "a".parse()?; // Create a.by_k and backfill. let index_name = GenericIndexName::new(table_name.clone(), IndexDescriptor::new("by_k")?)?; let field: FieldPath = "k".parse()?; let index_fields = IndexedFields::try_from(vec![field.clone()])?; let mut tx = database.begin(Identity::system()).await?; IndexModel::new(&mut tx) .add_application_index( TableNamespace::test_user(), IndexMetadata::new_enabled(index_name.clone(), index_fields.clone()), ) .await?; database.commit(tx).await?; // Two documents, one which changes from "z" to "a". // In the first page, we read "a" and skip it. // Then we walk the documents log, and put it in skipped_keys as "z". // In the second page, we read "m" and output it. Then we output "z" // from the skipped keys. let mut tx = database.begin(Identity::system()).await?; TestFacingModel::new(&mut tx) .insert(&table_name, assert_obj!("k" => "m")) .await?; let id = TestFacingModel::new(&mut tx) .insert(&table_name, assert_obj!("k" => "z")) .await?; let table_mapping = tx.table_mapping().namespace(TableNamespace::test_user()); let by_k_metadata = IndexModel::new(&mut tx) .enabled_index_metadata(TableNamespace::test_user(), &index_name)? .unwrap(); let by_k_id = by_k_metadata.id().internal_id(); let snapshot_ts = unchecked_repeatable_ts(database.commit(tx).await?); let mut tx = database.begin(Identity::system()).await?; UserFacingModel::new_root_for_test(&mut tx) .replace(id.into(), assert_obj!("k" => "a")) .await?; database.commit(tx).await?; database.bump_max_repeatable_ts().await?; let iterator = database.table_iterator(snapshot_ts, 1); let tablet_id = table_mapping.id(&table_name)?.tablet_id; let revisions: Vec<_> = iterator .stream_documents_in_table_by_index(tablet_id, by_k_id, index_fields, None) .try_collect() .await?; assert_eq!(revisions.len(), 2); let k_values: Vec<_> = revisions .iter() .map(|(_, rev)| rev.value.value().get("k").unwrap().clone()) .collect(); assert_eq!(k_values, vec![assert_val!("m"), assert_val!("z")]); Ok(()) } #[convex_macro::test_runtime] async fn test_multi_iterator(rt: TestRuntime) -> anyhow::Result<()> { let DbFixtures { db: database, .. } = DbFixtures::new(&rt).await?; let mut tx = database.begin(Identity::system()).await?; let num_tables = 8; let table_names: Vec<TableName> = (0..num_tables) .map(|i| format!("table{i}").parse()) .collect::<Result<_, _>>()?; let mut docs = vec![]; for (i, table_name) in table_names.iter().enumerate() { let mut docs_in_table = vec![]; for j in 0..=i { docs_in_table.push( TestFacingModel::new(&mut tx) .insert_and_get( table_name.clone(), assert_obj!("a" => format!("value{i}_{j}")), ) .await?, ); } docs_in_table.sort_by_key(|d| d.id()); docs.push(docs_in_table); } let table_mapping = tx.table_mapping().namespace(TableNamespace::test_user()); let tablet_ids: Vec<_> = table_names .iter() .map(|name| Ok(table_mapping.id(name)?.tablet_id)) .collect::<anyhow::Result<_>>()?; let by_ids: Vec<_> = table_names .iter() .map(|name| by_id_index(&mut tx, name)) .collect::<anyhow::Result<_>>()?; database.commit(tx).await?; let snapshot_ts = unchecked_repeatable_ts(database.bump_max_repeatable_ts().await?); let mut iterator = database .table_iterator(snapshot_ts, 3) .multi(tablet_ids.clone()); for (i, &tablet_id) in tablet_ids.iter().enumerate() { // Should observe the original version of the document in the table let documents: Vec<_> = iterator .stream_documents_in_table(tablet_id, by_ids[i], None) .try_collect() .await?; assert_eq!( documents .iter() .map(|d| d.value.clone()) .collect::<Vec<_>>(), docs[i] ); // Do some more changes to interfere with the next iteration let mut tx = database.begin(Identity::system()).await?; for table in &table_names { TestFacingModel::new(&mut tx) .insert(table, assert_obj!("a" => "blah")) .await?; } for docs_in_table in &docs { for doc in docs_in_table { tx.replace_inner(doc.id(), assert_obj!("a" => "changed")) .await?; } } database.commit(tx).await?; database.bump_max_repeatable_ts().await?; // Also, it should be ok to query the same table more than once let documents_again: Vec<_> = iterator .stream_documents_in_table(tablet_id, by_ids[i], None) .try_collect() .await?; assert_eq!(documents, documents_again); iterator.unregister_table(tablet_id)?; } Ok(()) } proptest! { #![proptest_config( ProptestConfig { cases: 256 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, ..ProptestConfig::default() } )] #[test] fn test_iterator_includes_all_documents( table_name in TableName::user_strategy(), objects in small_user_objects(), ) { iterator_includes_all_documents_test(table_name, objects); } #[test] fn test_racing_commits( table_name in TableName::user_strategy(), initial in small_user_objects(), update_batches in racing_updates(), ) { let (pause, pause_client) = PauseController::new(); let td = TestDriver::new_with_pause_client(pause_client); td.run_until( racing_commits_test(td.rt(), table_name, initial, update_batches, pause), ).unwrap(); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server