Skip to main content
Glama

Convex MCP server

Official
by get-convex
flusher.rs38.3 kB
use std::sync::Arc; use common::{ knobs::SEARCH_INDEX_SIZE_SOFT_LIMIT, persistence::PersistenceReader, runtime::Runtime, }; use search::searcher::SegmentTermMetadataFetcher; use storage::Storage; use crate::{ search_index_workers::{ search_flusher::{ SearchFlusher, SearchIndexLimits, }, writer::SearchIndexMetadataWriter, FlusherType, }, text_index_worker::text_meta::{ BuildTextIndexArgs, TextSearchIndex, }, Database, }; #[cfg(any(test, feature = "testing"))] pub async fn backfill_text_indexes<RT: Runtime>( runtime: RT, database: Database<RT>, reader: Arc<dyn PersistenceReader>, storage: Arc<dyn Storage>, segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>, ) -> anyhow::Result<()> { let writer = SearchIndexMetadataWriter::new( runtime.clone(), database.clone(), reader.clone(), storage.clone(), BuildTextIndexArgs { search_storage: storage.clone(), segment_term_metadata_fetcher: segment_term_metadata_fetcher.clone(), }, ); let flusher = FlusherBuilder::new( runtime.clone(), database.clone(), reader.clone(), storage.clone(), segment_term_metadata_fetcher.clone(), writer.clone(), FlusherType::Backfill, ) .set_soft_limit(0) .build(); flusher.step().await?; let flusher = FlusherBuilder::new( runtime, database, reader, storage, segment_term_metadata_fetcher, writer, FlusherType::LiveFlush, ) .set_soft_limit(0) .build(); flusher.step().await?; Ok(()) } pub(crate) struct FlusherBuilder<RT: Runtime> { runtime: RT, database: Database<RT>, reader: Arc<dyn PersistenceReader>, storage: Arc<dyn Storage>, segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>, limits: SearchIndexLimits, writer: SearchIndexMetadataWriter<RT, TextSearchIndex>, flusher_type: FlusherType, } impl<RT: Runtime> FlusherBuilder<RT> { pub(crate) fn new( runtime: RT, database: Database<RT>, reader: Arc<dyn PersistenceReader>, storage: Arc<dyn Storage>, segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>, writer: SearchIndexMetadataWriter<RT, TextSearchIndex>, flusher_type: FlusherType, ) -> Self { Self { runtime, database, reader, storage, segment_term_metadata_fetcher, writer, limits: SearchIndexLimits { index_size_soft_limit: *SEARCH_INDEX_SIZE_SOFT_LIMIT, incremental_multipart_threshold_bytes: *SEARCH_INDEX_SIZE_SOFT_LIMIT, }, flusher_type, } } #[cfg(any(test, feature = "testing"))] pub fn set_soft_limit(self, limit: usize) -> Self { Self { limits: SearchIndexLimits { index_size_soft_limit: limit, ..self.limits }, ..self } } #[cfg(any(test, feature = "testing"))] #[cfg_attr(not(test), expect(dead_code))] pub fn set_incremental_multipart_threshold_bytes(self, limit: usize) -> Self { Self { limits: SearchIndexLimits { incremental_multipart_threshold_bytes: limit, ..self.limits }, ..self } } #[cfg(any(test, feature = "testing"))] #[cfg_attr(not(test), expect(dead_code))] pub fn set_live_flush(self) -> Self { Self { flusher_type: FlusherType::LiveFlush, ..self } } pub(crate) fn build(self) -> TextIndexFlusher<RT> { SearchFlusher::new( self.runtime, self.database, self.reader, self.storage.clone(), self.limits, self.writer, BuildTextIndexArgs { search_storage: self.storage.clone(), segment_term_metadata_fetcher: self.segment_term_metadata_fetcher.clone(), }, self.flusher_type, ) } } pub type TextIndexFlusher<RT> = SearchFlusher<RT, TextSearchIndex>; #[allow(unused)] #[cfg(any(test, feature = "testing"))] pub fn new_text_flusher_for_tests<RT: Runtime>( runtime: RT, database: Database<RT>, reader: Arc<dyn PersistenceReader>, storage: Arc<dyn Storage>, segment_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>, ) -> TextIndexFlusher<RT> { let writer = SearchIndexMetadataWriter::new( runtime.clone(), database.clone(), reader.clone(), storage.clone(), BuildTextIndexArgs { search_storage: storage.clone(), segment_term_metadata_fetcher: segment_metadata_fetcher.clone(), }, ); FlusherBuilder::new( runtime, database, reader, storage, segment_metadata_fetcher, writer, FlusherType::Backfill, ) .build() } pub(crate) fn new_text_flusher<RT: Runtime>( runtime: RT, database: Database<RT>, reader: Arc<dyn PersistenceReader>, storage: Arc<dyn Storage>, segment_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>, writer: SearchIndexMetadataWriter<RT, TextSearchIndex>, flusher_type: FlusherType, ) -> TextIndexFlusher<RT> { FlusherBuilder::new( runtime, database, reader, storage, segment_metadata_fetcher, writer, flusher_type, ) .build() } #[cfg(test)] mod tests { use std::time::Duration; use anyhow::Context; use common::{ bootstrap_model::index::{ text_index::{ TextIndexSnapshot, TextIndexState, }, IndexConfig, IndexMetadata, }, pause::PauseController, runtime::testing::TestRuntime, types::{ IndexName, TabletIndexName, }, }; use maplit::btreemap; use must_let::must_let; use sync_types::Timestamp; use value::{ assert_obj, TableNamespace, }; use crate::{ search_index_workers::{ search_compactor::CompactionConfig, search_flusher::FLUSH_RUNNING_LABEL, }, tests::text_test_utils::{ add_document, IndexData, TextFixtures, }, Database, IndexModel, TestFacingModel, }; async fn assert_snapshotted( database: &Database<TestRuntime>, namespace: TableNamespace, index_name: &IndexName, ) -> anyhow::Result<Timestamp> { let mut tx = database.begin_system().await?; let new_metadata = IndexModel::new(&mut tx) .enabled_index_metadata(namespace, index_name)? .context("Index missing or in an unexpected state")? .into_value(); must_let!(let IndexMetadata { config: IndexConfig::Text { on_disk_state: TextIndexState::SnapshottedAt(TextIndexSnapshot { ts, .. }), .. }, .. } = new_metadata); Ok(ts) } async fn enable_pending_index( database: &Database<TestRuntime>, namespace: TableNamespace, index_name: &IndexName, ) -> anyhow::Result<()> { let mut tx = database.begin_system().await.unwrap(); let mut model = IndexModel::new(&mut tx); let index = model .pending_index_metadata(namespace, index_name)? .context(format!("Missing pending index for {index_name:?}"))?; model.enable_backfilled_indexes(vec![index]).await?; database.commit(tx).await?; Ok(()) } #[convex_macro::test_runtime] async fn test_build_search_index(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt.clone()).await?; let IndexData { index_name, resolved_index_name, .. } = fixtures .insert_backfilling_text_index_with_document() .await?; let worker = fixtures.new_backfill_text_flusher(); // Run one interation of the search index worker. let (metrics, _) = worker.step().await?; // Make sure we actually built this index with one document. assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 1}); // Check that the metadata is updated so it's no longer backfilling. fixtures.assert_backfilled(&index_name).await?; Ok(()) } #[convex_macro::test_runtime] async fn test_rebuild_backfilled_search_index(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt.clone()).await?; let database = &fixtures.db; let IndexData { index_name, resolved_index_name, .. } = fixtures .insert_backfilling_text_index_with_document() .await?; let worker = fixtures.new_backfill_text_flusher(); // Run one interation of the search index worker. let (metrics, _) = worker.step().await?; // Make sure we actually built this index with one document. assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 1}); // Check that the metadata is updated so it's no longer backfilling. let initial_snapshot_ts = fixtures.assert_backfilled(&index_name).await?; // Write 10 more documents into the table to trigger a new snapshot. let mut tx = database.begin_system().await.unwrap(); let num_new_documents = 10; for _ in 0..num_new_documents { add_document( &mut tx, index_name.table(), "hello world, this is a message with more than just a few terms in it", ) .await?; } database.commit(tx).await?; let worker = fixtures.new_live_text_flusher(); let (metrics, _) = worker.step().await?; assert_eq!( metrics, btreemap! {resolved_index_name.clone() => num_new_documents} ); // Check that the metadata is updated so it's no longer backfilling. let new_snapshot_ts = fixtures.assert_backfilled(&index_name).await?; assert!(new_snapshot_ts > initial_snapshot_ts); Ok(()) } #[convex_macro::test_runtime] async fn test_rebuild_enabled_search_index(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt.clone()).await?; let IndexData { index_name, resolved_index_name, namespace, .. } = fixtures .insert_backfilling_text_index_with_document() .await?; let worker = fixtures.new_backfill_text_flusher(); // Run one interation of the search index worker. let (metrics, _) = worker.step().await?; // Make sure we actually built this index with one document. assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 1}); // Check that the metadata is updated so it's no longer backfilling. let initial_snapshot_ts = fixtures.assert_backfilled(&index_name).await?; // Enable the index so it's in the Snapshotted state. enable_pending_index(&fixtures.db, namespace, &index_name).await?; // Write 10 more documents into the table to trigger a new snapshot. let mut tx = fixtures.db.begin_system().await.unwrap(); let num_new_documents = 10; for _ in 0..num_new_documents { add_document( &mut tx, index_name.table(), "hello world, this is a message with more than just a few terms in it", ) .await?; } fixtures.db.commit(tx).await?; let worker = fixtures.new_live_text_flusher(); let (metrics, _) = worker.step().await?; assert_eq!( metrics, btreemap! {resolved_index_name.clone() => num_new_documents} ); // Check that the metadata is updated and still enabled. let new_snapshot_ts = assert_snapshotted(&fixtures.db, namespace, &index_name).await?; assert!(new_snapshot_ts > initial_snapshot_ts); Ok(()) } #[convex_macro::test_runtime] async fn test_advance_old_snapshot(rt: TestRuntime) -> anyhow::Result<()> { common::testing::init_test_logging(); let fixtures = TextFixtures::new(rt.clone()).await?; let worker = fixtures.new_backfill_flusher_with_soft_limit(); let database = &fixtures.db; let IndexData { index_name, resolved_index_name, .. } = fixtures .insert_backfilling_text_index_with_document() .await?; let (metrics, _) = worker.step().await?; assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 1}); let initial_snapshot_ts = fixtures.assert_backfilled(&index_name).await?; // Write a single document underneath our soft limit and check that we don't // snapshot. let mut tx = database.begin_system().await?; add_document(&mut tx, index_name.table(), "too small to count").await?; database.commit(tx).await?; let worker = fixtures.new_live_flusher_with_soft_limit(); let (metrics, _) = worker.step().await?; assert!(metrics.is_empty()); assert_eq!( initial_snapshot_ts, fixtures.assert_backfilled(&index_name).await? ); // Advance time past the max index age (and do an unrelated commit to bump the // repeatable timestamp). rt.advance_time(Duration::from_secs(7200)).await; let mut tx = database.begin_system().await?; let unrelated_document = assert_obj!("wise" => "ambience"); TestFacingModel::new(&mut tx) .insert(&"unrelated".parse()?, unrelated_document) .await?; database.commit(tx).await?; let (metrics, _) = worker.step().await?; assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 1}); assert!(initial_snapshot_ts < fixtures.assert_backfilled(&index_name).await?); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_no_documents_sets_state_to_backfilled( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let index_data = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; fixtures.assert_backfilled(&index_data.index_name).await?; Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_no_documents_returns_index_in_metrics( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let index_data = fixtures.insert_backfilling_text_index().await?; let mut tx = fixtures.db.begin_system().await?; let table_id = tx .table_mapping() .namespace(TableNamespace::test_user()) .id(index_data.index_name.table())? .tablet_id; let resolved_index_name = TabletIndexName::new(table_id, index_data.index_name.descriptor().clone())?; let flusher = fixtures.new_backfill_text_flusher(); let (metrics, _) = flusher.step().await?; assert_eq!(metrics, btreemap! { resolved_index_name => 0 }); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_one_document_sets_state_to_backfilled( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let index_data = fixtures .insert_backfilling_text_index_with_document() .await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; fixtures.assert_backfilled(&index_data.index_name).await?; Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_one_document_returns_metrics(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { resolved_index_name, .. } = fixtures .insert_backfilling_text_index_with_document() .await?; let flusher = fixtures.new_backfill_text_flusher(); let (metrics, _) = flusher.step().await?; assert_eq!(metrics, btreemap! { resolved_index_name => 1 }); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_one_document_writes_document(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let index_data = fixtures.insert_backfilling_text_index().await?; let doc_id = fixtures.add_document("cat").await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; fixtures.enable_index(&index_data.index_name).await?; let results = fixtures.search(index_data.index_name, "cat").await?; assert_eq!(results.first().unwrap().id(), doc_id); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_two_documents_0_max_segment_size_creates_two_segments( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, index_id, .. } = fixtures.insert_backfilling_text_index().await?; fixtures.add_document("some text").await?; fixtures.add_document("some other text").await?; let flusher = fixtures .new_search_flusher_builder() .set_incremental_multipart_threshold_bytes(0) .build(); // Build the first segment, which stops because the document size is > 0 flusher.step().await?; // Should have written backfill progress, and it is halfway done. let progress = fixtures .index_backfill_progress(index_id.developer_id) .await? .unwrap(); assert_eq!(progress.num_docs_indexed, 1); assert_eq!(progress.total_docs, Some(2)); // Build the second segment and finalize the index metadata. flusher.step().await?; // Should have written backfill progress, and it is complete. let progress = fixtures .index_backfill_progress(index_id.developer_id) .await? .unwrap(); assert_eq!(progress.num_docs_indexed, 2); assert_eq!(progress.total_docs, Some(2)); let segments = fixtures.get_segments_metadata(index_name).await?; assert_eq!(segments.len(), 2); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_two_documents_leaves_document_backfilling_after_first_flush( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let index_data = fixtures.insert_backfilling_text_index().await?; fixtures.add_document("cat").await?; fixtures.add_document("dog").await?; let flusher = fixtures .new_search_flusher_builder() .set_incremental_multipart_threshold_bytes(0) .build(); // Build the first segment, which stops because the document size is > 0 flusher.step().await?; let metadata = fixtures.get_index_metadata(index_data.index_name).await?; must_let!(let IndexConfig::Text { on_disk_state, .. }= &metadata.config); must_let!(let TextIndexState::Backfilling(backfilling_meta) = on_disk_state); assert_eq!(backfilling_meta.segments.len(), 1); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_two_documents_0_max_segment_size_includes_both_documents( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let cat_doc_id = fixtures.add_document("cat").await?; let dog_doc_id = fixtures.add_document("dog").await?; let flusher = fixtures .new_search_flusher_builder() .set_incremental_multipart_threshold_bytes(0) .build(); // Build the first segment, which stops because the document size is > 0 flusher.step().await?; // Build the second segment and finalize the index metadata. flusher.step().await?; fixtures.enable_index(&index_name).await?; let cat_results = fixtures.search(index_name.clone(), "cat").await?; assert_eq!(cat_results.first().unwrap().id(), cat_doc_id); let dog_results = fixtures.search(index_name, "dog").await?; assert_eq!(dog_results.first().unwrap().id(), dog_doc_id); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_empty_index_adds_no_segments(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; let segments = fixtures.get_segments_metadata(index_name).await?; assert_eq!(0, segments.len()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_empty_backfilled_index_new_document_adds_document( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name, "cat").await?; assert_eq!(doc_id, results.first().unwrap().id()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_non_empty_backfilled_index_new_document_adds_document( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; fixtures.add_document("dog").await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name, "cat").await?; assert_eq!(doc_id, results.first().unwrap().id()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_empty_enabled_index_new_document_adds_document( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; fixtures.enable_index(&index_name).await?; let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; let results = fixtures.search(index_name, "cat").await?; assert_eq!(doc_id, results.first().unwrap().id()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_non_empty_enabled_index_new_document_adds_document( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; fixtures.add_document("dog").await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; fixtures.enable_index(&index_name).await?; let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; let results = fixtures.search(index_name, "cat").await?; assert_eq!(doc_id, results.first().unwrap().id()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_non_empty_enabled_index_new_document_adds_new_segment( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; fixtures.add_document("dog").await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; fixtures.enable_index(&index_name).await?; fixtures.add_document("cat").await?; let flusher = fixtures.new_live_text_flusher(); flusher.step().await?; let segments = fixtures.get_segments_metadata(index_name).await?; assert_eq!(segments.len(), 2); Ok(()) } #[convex_macro::test_runtime] async fn backfill_with_non_empty_backfilled_index_new_document_adds_new_segment( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; fixtures.add_document("dog").await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; fixtures.add_document("cat").await?; let flusher = fixtures.new_live_text_flusher(); flusher.step().await?; fixtures.enable_index(&index_name).await?; let segments = fixtures.get_segments_metadata(index_name).await?; assert_eq!(segments.len(), 2); Ok(()) } #[convex_macro::test_runtime] async fn backfill_one_doc_added_then_deleted_single_build_does_not_include_deleted_document( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); flusher.step().await?; let doc_id = fixtures.add_document("cat").await?; let mut tx = fixtures.db.begin_system().await?; tx.delete_inner(doc_id).await?; fixtures.db.commit(tx).await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name, "cat").await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_one_doc_added_then_deleted_separate_builds_does_not_include_deleted_document( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; let mut tx = fixtures.db.begin_system().await?; tx.delete_inner(doc_id).await?; fixtures.db.commit(tx).await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name, "cat").await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_one_doc_added_then_replaced_separate_builds_does_not_include_first_document( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; let mut tx = fixtures.db.begin_system().await?; tx.replace_inner(doc_id, assert_obj!()).await?; fixtures.db.commit(tx).await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name, "cat").await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_insert_replace_one_segment(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; fixtures.replace_document(doc_id, "new_text").await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name.clone(), "cat").await?; assert!(results.is_empty()); let results = fixtures.search(index_name, "new_text").await?; assert!(!results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_insert_replace_delete_one_segment(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; fixtures.replace_document(doc_id, "new_text").await?; let mut tx = fixtures.db.begin_system().await?; tx.delete_inner(doc_id).await?; fixtures.db.commit(tx).await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name.clone(), "cat").await?; assert!(results.is_empty()); let results = fixtures.search(index_name, "new_text").await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_insert_then_replace_delete_separate_segment( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; fixtures.replace_document(doc_id, "new_text").await?; let mut tx = fixtures.db.begin_system().await?; tx.delete_inner(doc_id).await?; fixtures.db.commit(tx).await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name.clone(), "cat").await?; assert!(results.is_empty()); let results = fixtures.search(index_name, "new_text").await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_insert_then_replace_delete_separate_segment_many_replaces( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; fixtures.replace_document(doc_id, "dog").await?; flusher.step().await?; fixtures.replace_document(doc_id, "newer_text").await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name.clone(), "cat").await?; assert!(results.is_empty()); let results = fixtures.search(index_name.clone(), "dog").await?; assert!(results.is_empty()); let results = fixtures.search(index_name, "newer_text").await?; assert!(!results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_insert_then_replace_delete_second_segment( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; fixtures.replace_document(doc_id, "new_text").await?; let mut tx = fixtures.db.begin_system().await?; tx.delete_inner(doc_id).await?; fixtures.db.commit(tx).await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name.clone(), "cat").await?; assert!(results.is_empty()); let results = fixtures.search(index_name, "new_text").await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_insert_then_replace_delete_separate_segments( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; fixtures.replace_document(doc_id, "new_text").await?; flusher.step().await?; let mut tx = fixtures.db.begin_system().await?; tx.delete_inner(doc_id).await?; fixtures.db.commit(tx).await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name.clone(), "cat").await?; assert!(results.is_empty()); let results = fixtures.search(index_name, "new_text").await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_insert_replace_replace_delete_single_segment( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; fixtures.replace_document(doc_id, "new_text").await?; fixtures.replace_document(doc_id, "really_new_text").await?; let mut tx = fixtures.db.begin_system().await?; tx.delete_inner(doc_id).await?; fixtures.db.commit(tx).await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name, "really_new_text").await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn backfill_insert_replace_replace_delete_different_segments( rt: TestRuntime, ) -> anyhow::Result<()> { let fixtures = TextFixtures::new(rt).await?; let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?; let flusher = fixtures.new_backfill_text_flusher(); let doc_id = fixtures.add_document("cat").await?; flusher.step().await?; fixtures.replace_document(doc_id, "new_text").await?; flusher.step().await?; fixtures.replace_document(doc_id, "really_new_text").await?; flusher.step().await?; let mut tx = fixtures.db.begin_system().await?; tx.delete_inner(doc_id).await?; fixtures.db.commit(tx).await?; flusher.step().await?; fixtures.enable_index(&index_name).await?; let results = fixtures.search(index_name, "really_new_text").await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn concurrent_compaction_and_flush_new_segment_propagates_deletes( rt: TestRuntime, pause: PauseController, ) -> anyhow::Result<()> { let config = CompactionConfig::default(); let min_compaction_segments = config.min_compaction_segments; let config = CompactionConfig { // Treat everything as a large segment small_segment_threshold_bytes: 0, ..config }; let fixtures = TextFixtures::new_with_config(rt.clone(), config).await?; let index_data = fixtures.enabled_text_index().await?; let IndexData { index_name, .. } = index_data; // Create enough segments to trigger compaction. let mut deleted_doc_ids = vec![]; for _ in 0..min_compaction_segments { deleted_doc_ids.push(fixtures.add_document("test").await?); fixtures.backfill().await?; } // Queue up deletes for all existing segments, and one new document that will // cause the flusher to write a new segment. for doc_id in &deleted_doc_ids { fixtures.replace_document(*doc_id, "updated").await?; } let _non_deleted_id = fixtures.add_document("test").await?; // Run the compactor / flusher concurrently in a way where the compactor // wins the race. fixtures .run_compaction_during_flush(pause, FLUSH_RUNNING_LABEL) .await?; // Verify we propagate the new deletes to the compacted segment and retain our // new segment. let segments = fixtures.get_segments_metadata(index_name).await?; assert_eq!(2, segments.len()); let (compacted_segment, new_segment): (Vec<_>, Vec<_>) = segments .into_iter() .partition(|segment| segment.num_deleted_documents > 0); assert_eq!(compacted_segment.len(), 1); assert_eq!(new_segment.len(), 1); // TODO Verify segment contents Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server