Skip to main content
Glama

Convex MCP server

Official
by get-convex
transaction_index.rs52.1 kB
use std::{ cmp::Ordering, collections::BTreeMap, sync::{ Arc, OnceLock, }, }; use async_trait::async_trait; use common::{ bootstrap_model::index::{ database_index::{ DatabaseIndexSpec, IndexedFields, }, IndexConfig, }, document::{ DocumentUpdate, PackedDocument, ResolvedDocument, }, index::{ IndexKey, IndexKeyBytes, }, interval::Interval, knobs::TRANSACTION_MAX_READ_SIZE_BYTES, query::{ CursorPosition, InternalSearch, Order, SearchVersion, }, runtime, types::{ DatabaseIndexUpdate, DatabaseIndexValue, IndexId, IndexName, TabletIndexName, WriteTimestamp, }, }; use imbl::OrdMap; use indexing::{ backend_in_memory_indexes::{ DatabaseIndexSnapshot, LazyDocument, RangeRequest, }, index_registry::{ Index, IndexRegistry, }, }; use search::{ query::RevisionWithKeys, CandidateRevision, QueryResults, Searcher, TextIndexManager, }; use storage::Storage; use tokio::task; use value::{ DeveloperDocumentId, FieldPath, }; use crate::{ preloaded::PreloadedIndexRange, query::IndexRangeResponse, reads::TransactionReadSet, writes::PendingWrites, DEFAULT_PAGE_SIZE, }; /// [`TransactionIndex`] is an index used by transactions. /// It gets constructed from [`DatabaseIndexSnapshot`] and [`IndexRegistry`] at /// a timestamp snapshot. It buffers the transaction pending index updates and /// merges and overlays them on top of the snapshot to allow the transaction to /// read its own writes. #[derive(Clone)] pub struct TransactionIndex { // Metadata about existing indexes with any changes to the index tables applied. Note that // those changes are stored separately in `database_index_updates` and `search_index_updates` // in their underlying database writes too. index_registry: IndexRegistry, // Weather the index registry has been updates since the beginning of the transaction. index_registry_updated: bool, // Database indexes combine a base index snapshot in persistence with pending updates applied // in-memory. database_index_snapshot: DatabaseIndexSnapshot, database_index_updates: OrdMap<IndexId, TransactionIndexMap>, // Similar to database indexes, text search indexes are implemented by applying pending updates // on top of the transaction base snapshot. text_index_snapshot: Arc<dyn TransactionTextSnapshot>, text_index_updates: OrdMap<IndexId, Vec<DocumentUpdate>>, } impl PendingWrites for TransactionIndex {} impl TransactionIndex { pub fn new( index_registry: IndexRegistry, database_index_snapshot: DatabaseIndexSnapshot, text_index_snapshot: Arc<dyn TransactionTextSnapshot>, ) -> Self { Self { index_registry, index_registry_updated: false, database_index_snapshot, database_index_updates: OrdMap::new(), text_index_snapshot, text_index_updates: OrdMap::new(), } } pub fn index_registry(&self) -> &IndexRegistry { &self.index_registry } /// Range over a index including pending updates. /// `max_size` provides an estimate of the number of rows to be /// streamed from the database. /// The returned vecs may be larger or smaller than `max_size` depending on /// pending writes. pub(crate) async fn range_no_deps( &mut self, ranges: &[&RangeRequest], ) -> Vec< anyhow::Result<( Vec<(IndexKeyBytes, LazyDocument, WriteTimestamp)>, CursorPosition, )>, > { let snapshot_results = self.database_index_snapshot.range_batch(ranges).await; let batch_size = ranges.len(); let mut results = Vec::with_capacity(batch_size); for (&range_request, snapshot_result) in ranges.iter().zip(snapshot_results) { let result = try { let (snapshot_result_vec, cursor) = snapshot_result?; let mut snapshot_it = snapshot_result_vec.into_iter(); let index_registry = &self.index_registry; let database_index_updates = &self.database_index_updates; let pending_it = match index_registry.require_enabled( &range_request.index_name, &range_request.printable_index_name, ) { Ok(index) => database_index_updates.get(&index.id()), // Range queries on missing tables are allowed for system provided indexes. Err(_) if range_request.index_name.is_by_id_or_creation_time() => None, Err(e) => Err(e)?, } .map(|pending| pending.range(&range_request.interval)) .into_iter() .flatten(); let mut pending_it = range_request.order.apply(pending_it); let mut snapshot_next = snapshot_it.next(); let mut pending_next = pending_it.next(); let mut range_results = vec![]; loop { task::consume_budget().await; match (snapshot_next, pending_next) { ( Some((snapshot_key, snapshot_ts, snapshot_doc)), Some((pending_key, maybe_pending_doc)), ) => { let cmp = match range_request.order { Order::Asc => snapshot_key.cmp(&pending_key), Order::Desc => pending_key.cmp(&snapshot_key), }; match cmp { Ordering::Less => { range_results.push(( snapshot_key, snapshot_doc, WriteTimestamp::Committed(snapshot_ts), )); snapshot_next = snapshot_it.next(); pending_next = Some((pending_key, maybe_pending_doc)); }, Ordering::Equal => { // The pending entry overwrites the snapshot one. if let Some(pending_doc) = maybe_pending_doc { range_results.push(( pending_key, pending_doc.into(), WriteTimestamp::Pending, )); }; snapshot_next = snapshot_it.next(); pending_next = pending_it.next(); }, Ordering::Greater => { if let Some(pending_doc) = maybe_pending_doc { range_results.push(( pending_key, pending_doc.into(), WriteTimestamp::Pending, )); }; snapshot_next = Some((snapshot_key, snapshot_ts, snapshot_doc)); pending_next = pending_it.next(); }, } }, (Some((snapshot_key, snapshot_ts, snapshot_doc)), None) => { range_results.push(( snapshot_key, snapshot_doc, WriteTimestamp::Committed(snapshot_ts), )); snapshot_next = snapshot_it.next(); pending_next = None; }, (None, Some((pending_key, maybe_pending_doc))) => { if let Some(pending_doc) = maybe_pending_doc { range_results.push(( pending_key, pending_doc.into(), WriteTimestamp::Pending, )); }; snapshot_next = None; pending_next = pending_it.next(); }, (None, None) => break, } } if !range_request.interval.contains_cursor(&cursor) { Err(anyhow::anyhow!( "query for {:?} not making progress", range_request.interval ))?; } (range_results, cursor) }; results.push(result); } assert_eq!(results.len(), batch_size); results } #[fastrace::trace] pub async fn search( &mut self, reads: &mut TransactionReadSet, query: &InternalSearch, index_name: TabletIndexName, version: SearchVersion, ) -> anyhow::Result<Vec<(CandidateRevision, IndexKeyBytes)>> { // We do not allow modifying the index registry and performing a text search // in the same transaction. We could implement this by sending the index // updates in the search request, but there is no need to bother since we // don't yet have a use case of modifying an index metadata and performing // a text search in the same transaction. anyhow::ensure!( !self.index_registry_updated, "Text search and index registry update not allowed in the same transaction" ); // HACK: instead of using `self.require_enabled` we access the // `IndexRegistry` directly to fetch index info, which skips recording the // read of `index.id()` into our `TransactionReadSet`. // This avoids invalidating the transaction based on the precise value // of the `TextIndexState`, as the transaction does not logically depend // on it, and therefore avoids invalidation after flushing or compacting // search indexes. // TODO(ENG-9324): this has the side effect of failing to invalidate // transactions if the search index is removed. In practice, that should // only happen as part of a push that would separately invalidate user // transactions anyway. let index = self .index_registry .require_enabled(&index_name, &query.printable_index_name()?)?; let empty = vec![]; let pending_updates = self.text_index_updates.get(&index.id).unwrap_or(&empty); let results = self .text_index_snapshot .search(&index, query, version, pending_updates) .await?; // TODO: figure out if we want to charge database bandwidth for reading search // index metadata once search is no longer beta // Record the query results in the read set. reads.record_search(index_name.clone(), results.reads); Ok(results.revisions_with_keys) } /// Fetch a batch of index ranges. This method does not update the read set, /// since we might be fetching more documents than the caller actually needs /// due to filtering. /// /// Callers must call `record_indexed_directly` when consuming the results. pub async fn range_batch( &mut self, ranges: &[&RangeRequest], ) -> Vec<anyhow::Result<IndexRangeResponse>> { let batch_size = ranges.len(); let mut results = Vec::with_capacity(batch_size); let fetch_results = self.range_no_deps(ranges).await; for ( RangeRequest { index_name: _, printable_index_name: _, interval, order: _, max_size, }, fetch_result, ) in ranges.iter().zip(fetch_results) { let result: anyhow::Result<_> = try { let (documents, fetch_cursor) = fetch_result?; let mut total_bytes = 0; let mut within_bytes_limit = true; let out: Vec<_> = documents .into_iter() .map(|(key, doc, ts)| (key, doc.unpack(), ts)) .take(*max_size) .take_while(|(_, document, _)| { within_bytes_limit = total_bytes < *TRANSACTION_MAX_READ_SIZE_BYTES; // Allow the query to exceed the limit by one document so the query // is guaranteed to make progress and probably fail. // Note system document limits are different, so a single document // can be larger than `TRANSACTION_MAX_READ_SIZE_BYTES`. total_bytes += document.size(); within_bytes_limit }) .collect(); let cursor = if let Some((last_key, ..)) = out.last() && (out.len() >= *max_size || !within_bytes_limit) { // We hit an early termination condition within this page. CursorPosition::After(last_key.clone()) } else { // Everything fetched will be returned, so the cursor // of the page is the fetch cursor fetch_cursor }; if !interval.contains_cursor(&cursor) { Err(anyhow::anyhow!( "query for {interval:?} not making progress" ))?; } IndexRangeResponse { page: out, cursor } }; results.push(result); } assert_eq!(results.len(), batch_size); results } /// Returns the next page from the index range. /// NOTE: the caller must call reads.record_read_document for any /// documents yielded from the index scan and /// `reads.record_indexed_directly` for the interval actually read. /// Returns the remaining interval that was skipped because of max_size or /// transaction size limits. #[cfg(any(test, feature = "testing"))] pub async fn range( &mut self, range_request: RangeRequest, ) -> anyhow::Result<IndexRangeResponse> { let [result] = self .range_batch(&[&range_request]) .await .try_into() .map_err(|_| anyhow::anyhow!("wrong number of results"))?; result } #[fastrace::trace] pub async fn preload_index_range( &mut self, reads: &mut TransactionReadSet, tablet_index_name: &TabletIndexName, printable_index_name: &IndexName, interval: &Interval, ) -> anyhow::Result<PreloadedIndexRange> { let index = self.require_enabled(reads, tablet_index_name, printable_index_name)?; let IndexConfig::Database { spec: DatabaseIndexSpec { ref fields, .. }, .. } = index.metadata().config else { anyhow::bail!("{printable_index_name} isn't a database index"); }; let indexed_fields: Vec<FieldPath> = fields.clone().into(); let indexed_field = indexed_fields[0].clone(); anyhow::ensure!(indexed_fields.len() == 1); let mut remaining_interval = interval.clone(); let mut preloaded = BTreeMap::new(); while !remaining_interval.is_empty() { let [result] = self .range_no_deps(&[&RangeRequest { index_name: tablet_index_name.clone(), printable_index_name: printable_index_name.clone(), interval: remaining_interval, order: Order::Asc, max_size: DEFAULT_PAGE_SIZE, }]) .await .try_into() .map_err(|_| anyhow::anyhow!("wrong number of results"))?; let (documents, cursor) = result?; (_, remaining_interval) = interval.split(cursor, Order::Asc); for (_, document, _) in documents { let document = document.unpack(); let key = document.value().0.get_path(&indexed_field).cloned(); anyhow::ensure!( preloaded.insert(key, document).is_none(), "Index {printable_index_name:?} isn't unique", ); } } // Since PreloadedIndexRange only permits looking up documents by the index // key, we don't need to record `interval` as a read dependency. Put another // way, even though we're reading all of the rows in `interval`, the layer // above is only allowed to do point queries against `index_name`. Ok(PreloadedIndexRange::new( printable_index_name.table().clone(), tablet_index_name.clone(), indexed_field, preloaded, )) } // TODO: Add precise error types to facilitate detecting which indexing errors // are the developer's fault or not. pub fn begin_update( &mut self, old_document: Option<ResolvedDocument>, new_document: Option<ResolvedDocument>, ) -> anyhow::Result<Update<'_>> { let mut registry = self.index_registry.clone(); registry.update(old_document.as_ref(), new_document.as_ref())?; Ok(Update { index: self, deletion: old_document, insertion: new_document, registry, }) } fn finish_update( &mut self, old_document: Option<ResolvedDocument>, new_document: Option<ResolvedDocument>, ) -> Vec<DatabaseIndexUpdate> { // Update the index registry first. let index_registry_updated = self .index_registry .apply_verified_update(old_document.as_ref(), new_document.as_ref()); self.index_registry_updated |= index_registry_updated; // Then compute the index updates. let updates = self .index_registry .index_updates(old_document.as_ref(), new_document.as_ref()); // Add the index updates to self.database_index_updates. for update in &updates { let new_value = match &update.value { DatabaseIndexValue::Deleted => None, DatabaseIndexValue::NonClustered(doc_id) => { // The pending updates are clustered. Get the document // from the update itself. match new_document { Some(ref doc) => { assert_eq!(doc.id(), *doc_id); Some(doc) }, None => panic!("Unexpected index update: {:?}", update.value), } }, }; self.database_index_updates .entry(update.index_id) .or_insert_with(TransactionIndexMap::new) .insert(update.key.to_bytes(), new_value); } // If we are updating a document, the old and new ids must be the same. let document_id = new_document .as_ref() .map(|d| d.id()) .or(old_document.as_ref().map(|d| d.id())); if let Some(id) = document_id { // Add the update to all affected text search indexes. for index in self.index_registry.text_indexes_by_table(id.tablet_id) { self.text_index_updates .entry(index.id) .or_default() .push(DocumentUpdate { id, old_document: old_document.clone(), new_document: new_document.clone(), }); } } // Note that we do not update the vector index and we always read at the // base snapshot. updates } pub fn get_pending( &self, reads: &mut TransactionReadSet, index_name: &TabletIndexName, ) -> Option<&Index> { self._get(reads, || self.index_registry.get_pending(index_name)) } pub fn get_enabled( &self, reads: &mut TransactionReadSet, index_name: &TabletIndexName, ) -> Option<&Index> { self._get(reads, || self.index_registry.get_enabled(index_name)) } fn _get<'a>( &'a self, reads: &mut TransactionReadSet, getter: impl FnOnce() -> Option<&'a Index>, ) -> Option<&'a Index> { let result = getter(); self.record_interval(reads, result); result } pub fn require_enabled( &self, reads: &mut TransactionReadSet, index_name: &TabletIndexName, printable_index_name: &IndexName, ) -> anyhow::Result<Index> { let result = self .index_registry .require_enabled(index_name, printable_index_name)?; self.record_interval(reads, Some(&result)); Ok(result) } fn record_interval(&self, reads: &mut TransactionReadSet, index: Option<&Index>) { let index_table = self.index_registry.index_table(); let index_table_number = self.index_registry.index_table_number(); let interval = match index { // Note there is no _index.by_name index. In order for the // name->index mapping to depend only on index id, we rely // on index name being immutable. Some(index) => { let full_index_id = DeveloperDocumentId::new(index_table_number, index.id()); let index_key = IndexKey::new(vec![], full_index_id); Interval::prefix(index_key.to_bytes().into()) }, // On a name lookup miss, depend on all indexes. None => Interval::all(), }; reads.record_indexed_derived( TabletIndexName::by_id(index_table), IndexedFields::by_id(), interval, ); } /// Returns the snapshot the transaction is based on ignoring any pending /// updates. pub fn base_snapshot(&self) -> &DatabaseIndexSnapshot { &self.database_index_snapshot } pub fn base_snapshot_mut(&mut self) -> &mut DatabaseIndexSnapshot { &mut self.database_index_snapshot } } #[derive(Debug, Clone)] pub struct TransactionIndexMap { /// Unlike IndexMap we can simply use BTreeMap since the TransactionIndexMap /// does not get clones. The value needs to be Option<Document> since we /// need to distinguish between objects deleted within the transaction /// from objects that never existed. inner: BTreeMap<Vec<u8>, Option<PackedDocument>>, } impl TransactionIndexMap { pub fn new() -> Self { Self { inner: BTreeMap::new(), } } pub fn range( &self, interval: &Interval, ) -> impl DoubleEndedIterator<Item = (IndexKeyBytes, Option<ResolvedDocument>)> + '_ { self.inner .range(interval) .map(|(k, v)| (IndexKeyBytes(k.clone()), v.as_ref().map(|v| v.unpack()))) } pub fn insert(&mut self, k: IndexKeyBytes, v: Option<&ResolvedDocument>) { self.inner.insert(k.0, v.map(PackedDocument::pack)); } } pub struct Update<'a> { index: &'a mut TransactionIndex, deletion: Option<ResolvedDocument>, insertion: Option<ResolvedDocument>, registry: IndexRegistry, } impl Update<'_> { pub fn apply(self) -> Vec<DatabaseIndexUpdate> { self.index.finish_update(self.deletion, self.insertion) } pub fn registry(&self) -> &IndexRegistry { &self.registry } } #[async_trait] pub trait TransactionTextSnapshot: Send + Sync + 'static { // Search at the given snapshot after applying the given writes. async fn search( &self, index: &Index, search: &InternalSearch, version: SearchVersion, // Note that we have to send the writes since we maintain an extremely high // bar of determinism - we expect the exact same result regardless if you // perform a query from a mutation with some pending writes, or a query after the // writes have been committed to the database. The easiest way to achieve // this is to send all pending writes back to the backend. This should be fine // in practice since mutations with a lot of writes *and* a lot searches // should be rare. // As a potential future optimization, we could try to make the caller much // more coupled with the search algorithm and require it to send bm25 statistics // diff, top fuzzy search suggestions and other search specific properties derived // from the writes. Alternatively, we could only do subset of that and relax the // determinism requirement since we don't really need to have deterministic between // search calls in mutations and search calls in queries, and if anyone relies on // this they will get random differences due to parallel writes that alter the // statistics anyway. pending_updates: &Vec<DocumentUpdate>, ) -> anyhow::Result<QueryResults>; } #[derive(Clone)] pub struct TextIndexManagerSnapshot { index_registry: IndexRegistry, text_indexes: TextIndexManager, searcher: Arc<dyn Searcher>, search_storage: Arc<OnceLock<Arc<dyn Storage>>>, } impl TextIndexManagerSnapshot { pub fn new( index_registry: IndexRegistry, text_indexes: TextIndexManager, searcher: Arc<dyn Searcher>, search_storage: Arc<OnceLock<Arc<dyn Storage>>>, ) -> Self { Self { index_registry, text_indexes, searcher, search_storage, } } // Applies the writes to the base snapshot and returns the new snapshot. fn snapshot_with_updates( &self, pending_updates: &Vec<DocumentUpdate>, ) -> anyhow::Result<TextIndexManager> { let mut text_indexes = self.text_indexes.clone(); for DocumentUpdate { id: _, old_document, new_document, } in pending_updates { text_indexes.update( &self.index_registry, old_document.as_ref(), new_document.as_ref(), WriteTimestamp::Pending, )?; } Ok(text_indexes) } fn search_storage(&self) -> Arc<dyn Storage> { self.search_storage .get() .expect("search_storage not initialized") .clone() } #[fastrace::trace] pub async fn search_with_compiled_query( &self, index: &Index, printable_index_name: &IndexName, query: pb::searchlight::TextQuery, pending_updates: &Vec<DocumentUpdate>, ) -> anyhow::Result<RevisionWithKeys> { let text_indexes_snapshot = runtime::block_in_place(|| self.snapshot_with_updates(pending_updates))?; text_indexes_snapshot .search_with_compiled_query( index, printable_index_name, query, self.searcher.clone(), self.search_storage(), ) .await } } #[async_trait] impl TransactionTextSnapshot for TextIndexManagerSnapshot { async fn search( &self, index: &Index, search: &InternalSearch, version: SearchVersion, pending_updates: &Vec<DocumentUpdate>, ) -> anyhow::Result<QueryResults> { let text_indexes_snapshot = self.snapshot_with_updates(pending_updates)?; text_indexes_snapshot .search( index, search, self.searcher.clone(), self.search_storage(), version, ) .await } } pub struct SearchNotEnabled; #[async_trait] impl TransactionTextSnapshot for SearchNotEnabled { async fn search( &self, _index: &Index, _search: &InternalSearch, _version: SearchVersion, _pending_updates: &Vec<DocumentUpdate>, ) -> anyhow::Result<QueryResults> { anyhow::bail!("search not implemented in db-info") } } #[cfg(test)] mod tests { use std::{ collections::BTreeMap, str::FromStr, sync::{ Arc, OnceLock, }, }; use common::{ bootstrap_model::index::{ database_index::IndexedFields, IndexMetadata, TabletIndexMetadata, INDEX_TABLE, }, document::{ CreationTime, PackedDocument, ResolvedDocument, }, interval::Interval, persistence::{ now_ts, ConflictStrategy, DocumentLogEntry, Persistence, PersistenceIndexEntry, RepeatablePersistence, }, query::{ CursorPosition, Order, }, testing::{ TestIdGenerator, TestPersistence, }, types::{ unchecked_repeatable_ts, IndexDescriptor, IndexName, PersistenceVersion, TableName, TabletIndexName, Timestamp, WriteTimestamp, }, value::ResolvedDocumentId, }; use indexing::{ backend_in_memory_indexes::{ BackendInMemoryIndexes, DatabaseIndexSnapshot, RangeRequest, }, index_registry::IndexRegistry, }; use itertools::Itertools; use runtime::prod::ProdRuntime; use search::{ searcher::InProcessSearcher, TextIndexManager, TextIndexManagerState, }; use storage::{ LocalDirStorage, Storage, }; use value::assert_obj; use super::TextIndexManagerSnapshot; use crate::{ query::IndexRangeResponse, transaction_index::TransactionIndex, FollowerRetentionManager, }; fn next_document_id( id_generator: &mut TestIdGenerator, table_name: &str, ) -> anyhow::Result<ResolvedDocumentId> { Ok(id_generator.user_generate(&TableName::from_str(table_name)?)) } fn gen_index_document( id_generator: &mut TestIdGenerator, metadata: TabletIndexMetadata, ) -> anyhow::Result<ResolvedDocument> { let index_id = id_generator.system_generate(&INDEX_TABLE); ResolvedDocument::new(index_id, CreationTime::ONE, metadata.try_into()?) } async fn bootstrap_index( id_generator: &mut TestIdGenerator, mut indexes: Vec<TabletIndexMetadata>, persistence: RepeatablePersistence, ) -> anyhow::Result<( IndexRegistry, BackendInMemoryIndexes, TextIndexManager, BTreeMap<TabletIndexName, ResolvedDocumentId>, )> { let mut index_id_by_name = BTreeMap::new(); let mut index_documents = Vec::new(); let index_table = id_generator.system_table_id(&INDEX_TABLE).tablet_id; // Add the _index.by_id index. indexes.push(IndexMetadata::new_enabled( TabletIndexName::by_id(index_table), IndexedFields::by_id(), )); let ts = Timestamp::MIN; for metadata in indexes { let doc = gen_index_document(id_generator, metadata.clone())?; index_id_by_name.insert(metadata.name.clone(), doc.id()); index_documents.push((ts, PackedDocument::pack(&doc))); } let index_registry = IndexRegistry::bootstrap( id_generator, index_documents.iter().map(|(_, d)| d.clone()), PersistenceVersion::default(), )?; let index = BackendInMemoryIndexes::bootstrap(&index_registry, index_documents, ts)?; let search = TextIndexManager::new(TextIndexManagerState::Bootstrapping, persistence.version()); Ok((index_registry, index, search, index_id_by_name)) } #[convex_macro::prod_rt_test] async fn test_transaction_index_missing_index(rt: ProdRuntime) -> anyhow::Result<()> { let mut id_generator = TestIdGenerator::new(); let persistence = Arc::new(TestPersistence::new()); let retention_manager = Arc::new(FollowerRetentionManager::new(rt.clone(), persistence.clone()).await?); // Create a transactions with `by_name` index missing before the transaction // started. let rp = RepeatablePersistence::new( Arc::new(TestPersistence::new()), unchecked_repeatable_ts(Timestamp::must(1000)), retention_manager, ); let ps = rp.read_snapshot(unchecked_repeatable_ts(Timestamp::must(1000)))?; let table_id = id_generator.user_table_id(&"messages".parse()?).tablet_id; let messages_by_name = TabletIndexName::new(table_id, IndexDescriptor::new("by_name")?)?; let printable_messages_by_name = IndexName::new("messages".parse()?, IndexDescriptor::new("by_name")?)?; let (index_registry, inner, search, _) = bootstrap_index( &mut id_generator, vec![IndexMetadata::new_enabled( TabletIndexName::by_id(table_id), IndexedFields::by_id(), )], rp, ) .await?; let searcher = Arc::new(InProcessSearcher::new(rt.clone()).await?); let search_storage = Arc::new(LocalDirStorage::new(rt)?); let mut index = TransactionIndex::new( index_registry.clone(), DatabaseIndexSnapshot::new( index_registry.clone(), Arc::new(inner), id_generator.clone(), ps, ), Arc::new(TextIndexManagerSnapshot::new( index_registry.clone(), search, searcher.clone(), Arc::new(OnceLock::from(search_storage as Arc<dyn Storage>)), )), ); // Query the missing index. It should return an error because index is missing. { let result = index .range(RangeRequest { index_name: messages_by_name.clone(), printable_index_name: printable_messages_by_name.clone(), interval: Interval::all(), order: Order::Asc, max_size: 100, }) .await; assert!(result.is_err()); match result { Ok(_) => panic!("Should have failed!"), Err(ref err) => { assert!( format!("{err:?}").contains("Index messages.by_name not found."), "Actual: {err:?}" ) }, }; } // Add the index. It should start returning errors since the index was not // backfilled at the snapshot. let by_name_metadata = IndexMetadata::new_backfilling( Timestamp::must(1000), messages_by_name.clone(), vec!["name".parse()?].try_into()?, ); let by_name = gen_index_document(&mut id_generator, by_name_metadata)?; index.begin_update(None, Some(by_name))?.apply(); let result = index .range(RangeRequest { index_name: messages_by_name, printable_index_name: printable_messages_by_name, interval: Interval::all(), order: Order::Asc, max_size: 100, }) .await; assert!(result.is_err()); match result { Ok(_) => panic!("Should have failed!"), Err(ref err) => { assert!( format!("{err:?}").contains("Index messages.by_name is currently backfilling"), "Actual: {err:?}" ) }, }; Ok(()) } #[convex_macro::prod_rt_test] async fn test_transaction_index_missing_table(rt: ProdRuntime) -> anyhow::Result<()> { let mut id_generator = TestIdGenerator::new(); let table_id = id_generator.user_table_id(&"messages".parse()?).tablet_id; let by_id = TabletIndexName::by_id(table_id); let printable_by_id = IndexName::by_id("messages".parse()?); let by_name = TabletIndexName::new(table_id, IndexDescriptor::new("by_name")?)?; let printable_by_name = IndexName::new("messages".parse()?, IndexDescriptor::new("by_name")?)?; // Create a transactions with table missing before the transaction started. let persistence = Arc::new(TestPersistence::new()); let persistence_version = persistence.reader().version(); let retention_manager = Arc::new(FollowerRetentionManager::new(rt.clone(), persistence.clone()).await?); let rp = RepeatablePersistence::new( persistence, unchecked_repeatable_ts(Timestamp::must(1000)), retention_manager, ); let ps = rp.read_snapshot(unchecked_repeatable_ts(Timestamp::must(1000)))?; let (index_registry, inner, search, _) = bootstrap_index(&mut id_generator, vec![], rp).await?; let searcher = Arc::new(InProcessSearcher::new(rt.clone()).await?); let search_storage = Arc::new(LocalDirStorage::new(rt)?); let mut index = TransactionIndex::new( index_registry.clone(), DatabaseIndexSnapshot::new( index_registry.clone(), Arc::new(inner), id_generator.clone(), ps, ), Arc::new(TextIndexManagerSnapshot::new( index_registry.clone(), search, searcher.clone(), Arc::new(OnceLock::from(search_storage as Arc<dyn Storage>)), )), ); // Query the missing table using table scan index. It should return no results. let IndexRangeResponse { page: results, cursor, } = index .range(RangeRequest { index_name: by_id.clone(), printable_index_name: printable_by_id.clone(), interval: Interval::all(), order: Order::Asc, max_size: 100, }) .await?; assert!(matches!(cursor, CursorPosition::End)); assert!(results.is_empty()); // Query by any other index should return an error. { let result = index .range(RangeRequest { index_name: by_name, printable_index_name: printable_by_name, interval: Interval::all(), order: Order::Asc, max_size: 100, }) .await; assert!(result.is_err()); match result { Ok(_) => panic!("Should have failed!"), Err(ref err) => { assert!(format!("{err:?}").contains("Index messages.by_name not found."),) }, }; } // Add the table scan index. It should still give no results. let metadata = IndexMetadata::new_enabled(by_id.clone(), IndexedFields::by_id()); let by_id_index = gen_index_document(&mut id_generator, metadata.clone())?; index.begin_update(None, Some(by_id_index))?.apply(); let IndexRangeResponse { page: results, cursor, } = index .range(RangeRequest { index_name: by_id.clone(), printable_index_name: printable_by_id.clone(), interval: Interval::all(), order: Order::Asc, max_size: 100, }) .await?; assert!(matches!(cursor, CursorPosition::End)); assert!(results.is_empty()); // Add a document and make sure we see it. let doc = ResolvedDocument::new( next_document_id(&mut id_generator, "messages")?, CreationTime::ONE, assert_obj!( "content" => "hello there!", ), )?; index.begin_update(None, Some(doc.clone()))?.apply(); let IndexRangeResponse { page: result, cursor, } = index .range(RangeRequest { index_name: by_id, printable_index_name: printable_by_id, interval: Interval::all(), order: Order::Asc, max_size: 100, }) .await?; assert_eq!( result, vec![( doc.index_key(&IndexedFields::by_id()[..], persistence_version) .to_bytes(), doc, WriteTimestamp::Pending )], ); assert!(matches!(cursor, CursorPosition::End)); Ok(()) } #[convex_macro::prod_rt_test] async fn test_transaction_index_merge(rt: ProdRuntime) -> anyhow::Result<()> { let mut id_generator = TestIdGenerator::new(); let by_id_fields = vec![]; let by_name_fields = vec!["name".parse()?]; let now0 = now_ts(Timestamp::MIN, &rt)?; let ps = Arc::new(TestPersistence::new()); let persistence_version = ps.reader().version(); let retention_manager = Arc::new(FollowerRetentionManager::new(rt.clone(), ps.clone()).await?); let rp = RepeatablePersistence::new( ps.reader(), unchecked_repeatable_ts(now0), retention_manager.clone(), ); let table: TableName = "users".parse()?; let table_id = id_generator.user_table_id(&table).tablet_id; let by_id = TabletIndexName::by_id(table_id); let printable_by_id = IndexName::by_id(table.clone()); let by_name = TabletIndexName::new(table_id, IndexDescriptor::new("by_name")?)?; let printable_by_name = IndexName::new(table.clone(), IndexDescriptor::new("by_name")?)?; let (mut index_registry, mut index, search, _index_ids) = bootstrap_index( &mut id_generator, vec![ IndexMetadata::new_enabled(by_id.clone(), by_id_fields.clone().try_into()?), IndexMetadata::new_enabled(by_name.clone(), by_name_fields.clone().try_into()?), ], rp, ) .await?; async fn add( index_registry: &mut IndexRegistry, index: &mut BackendInMemoryIndexes, ps: &TestPersistence, ts: Timestamp, doc: ResolvedDocument, ) -> anyhow::Result<()> { index_registry.update(None, Some(&doc))?; let index_updates = index.update(index_registry, ts, None, Some(doc.clone())); ps.write( &[(DocumentLogEntry { ts, id: doc.id_with_table_id(), value: Some(doc.clone()), prev_ts: None, })], &index_updates .into_iter() .map(|u| PersistenceIndexEntry::from_index_update(ts, &u)) .collect_vec(), ConflictStrategy::Error, ) .await?; Ok(()) } // Add "Alice", "Bob" and "Zack" let alice = ResolvedDocument::new( next_document_id(&mut id_generator, "users")?, CreationTime::ONE, assert_obj!( "name" => "alice", ), )?; let now1 = now0.succ()?; add(&mut index_registry, &mut index, &ps, now1, alice.clone()).await?; let bob = ResolvedDocument::new( next_document_id(&mut id_generator, "users")?, CreationTime::ONE, assert_obj!( "name" => "bob", ), )?; let now2 = now1.succ()?; add(&mut index_registry, &mut index, &ps, now2, bob.clone()).await?; let zack = ResolvedDocument::new( next_document_id(&mut id_generator, "users")?, CreationTime::ONE, assert_obj!( "name" => "zack", ), )?; let now3 = now2.succ()?; add(&mut index_registry, &mut index, &ps, now3, zack.clone()).await?; id_generator.write_tables(ps.clone()).await?; let now4 = now3.succ()?; // Start a transaction, add "David" and delete "Bob" let ps = RepeatablePersistence::new(ps, unchecked_repeatable_ts(now4), retention_manager) .read_snapshot(unchecked_repeatable_ts(now4))?; let searcher = Arc::new(InProcessSearcher::new(rt.clone()).await?); let search_storage = Arc::new(LocalDirStorage::new(rt.clone())?); let mut index = TransactionIndex::new( index_registry.clone(), DatabaseIndexSnapshot::new( index_registry.clone(), Arc::new(index), id_generator.clone(), ps, ), Arc::new(TextIndexManagerSnapshot::new( index_registry.clone(), search, searcher.clone(), Arc::new(OnceLock::from(search_storage as Arc<dyn Storage>)), )), ); let david = ResolvedDocument::new( next_document_id(&mut id_generator, "users")?, CreationTime::ONE, assert_obj!("name" => "david"), )?; index.begin_update(None, Some(david.clone()))?.apply(); index.begin_update(Some(bob), None)?.apply(); // Query by id let IndexRangeResponse { page: results, cursor, } = index .range(RangeRequest { index_name: by_id.clone(), printable_index_name: printable_by_id, interval: Interval::all(), order: Order::Asc, max_size: 100, }) .await?; assert!(matches!(cursor, CursorPosition::End)); assert_eq!( results, vec![ ( alice .index_key(&by_id_fields[..], persistence_version) .to_bytes(), alice.clone(), WriteTimestamp::Committed(now1) ), ( zack.index_key(&by_id_fields[..], persistence_version) .to_bytes(), zack.clone(), WriteTimestamp::Committed(now3) ), ( david .index_key(&by_id_fields[..], persistence_version) .to_bytes(), david.clone(), WriteTimestamp::Pending ), ] ); // Query by name in ascending order let IndexRangeResponse { page: results, cursor, } = index .range(RangeRequest { index_name: by_name.clone(), printable_index_name: printable_by_name.clone(), interval: Interval::all(), order: Order::Asc, max_size: 100, }) .await?; assert!(matches!(cursor, CursorPosition::End)); assert_eq!( results, vec![ ( alice .index_key(&by_name_fields[..], persistence_version) .to_bytes(), alice.clone(), WriteTimestamp::Committed(now1) ), ( david .index_key(&by_name_fields[..], persistence_version) .to_bytes(), david.clone(), WriteTimestamp::Pending ), ( zack.index_key(&by_name_fields[..], persistence_version) .to_bytes(), zack.clone(), WriteTimestamp::Committed(now3) ), ] ); // Query by name in ascending order with limit=2. // Returned cursor should be After("david"). let IndexRangeResponse { page: results, cursor, } = index .range(RangeRequest { index_name: by_name.clone(), printable_index_name: printable_by_name.clone(), interval: Interval::all(), order: Order::Asc, max_size: 2, }) .await?; assert_eq!( cursor, CursorPosition::After( david .index_key(&by_name_fields[..], persistence_version) .to_bytes() ) ); assert_eq!( results, vec![ ( alice .index_key(&by_name_fields[..], persistence_version) .to_bytes(), alice.clone(), WriteTimestamp::Committed(now1) ), ( david .index_key(&by_name_fields[..], persistence_version) .to_bytes(), david.clone(), WriteTimestamp::Pending ), ] ); // Query by name in descending order let IndexRangeResponse { page: result, cursor, } = index .range(RangeRequest { index_name: by_name, printable_index_name: printable_by_name, interval: Interval::all(), order: Order::Desc, max_size: 100, }) .await?; assert!(matches!(cursor, CursorPosition::End)); assert_eq!( result, vec![ ( zack.index_key(&by_name_fields[..], persistence_version) .to_bytes(), zack, WriteTimestamp::Committed(now3) ), ( david .index_key(&by_name_fields[..], persistence_version) .to_bytes(), david, WriteTimestamp::Pending ), ( alice .index_key(&by_name_fields[..], persistence_version) .to_bytes(), alice, WriteTimestamp::Committed(now1) ), ] ); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server