Skip to main content
Glama

Convex MCP server

Official
by get-convex
randomized_search_tests.rs45.8 kB
use std::{ collections::{ btree_map::Entry, BTreeMap, BTreeSet, }, fmt::{ Display, Formatter, }, ops::Range, sync::Arc, }; use async_trait::async_trait; use cmd_util::env::env_config; use common::{ bootstrap_model::index::{ text_index::FragmentedTextSegment, vector_index::FragmentedVectorSegment, IndexMetadata, }, floating_point::assert_approx_equal, knobs::DATABASE_WORKERS_MAX_CHECKPOINT_AGE, pause::PauseController, persistence::Persistence, query::{ CursorPosition, Query, QueryOperator, QuerySource, Search, SearchFilterExpression, SearchVersion, }, types::{ IndexDescriptor, IndexName, Timestamp, }, value::{ sorting::{ sorting_decode::bytes_to_values, TotalOrdF64, }, ConvexValue, ResolvedDocumentId, TableName, }, version::MIN_NPM_VERSION_FOR_FUZZY_SEARCH, }; use errors::ErrorMetadataAnyhowExt; use futures::{ future::BoxFuture, pin_mut, select_biased, FutureExt, }; use keybroker::Identity; use maplit::btreeset; use must_let::must_let; use pb::searchlight::FragmentedVectorSegmentPaths; use proptest::prelude::*; use proptest_derive::Arbitrary; use runtime::testing::{ TestDriver, TestRuntime, }; use search::{ searcher::{ Bm25Stats, FragmentedTextStorageKeys, InProcessSearcher, PostingListMatch, PostingListQuery, Term, TokenMatch, TokenQuery, }, Searcher, MAX_CANDIDATE_REVISIONS, }; use storage::Storage; use usage_tracking::FunctionUsageTracker; use value::{ assert_obj, TableNamespace, }; use vector::{ CompiledVectorSearch, QdrantSchema, VectorSearchQueryResult, VectorSearcher, }; use crate::{ committer::AFTER_PENDING_WRITE_SNAPSHOT, query::{ PaginationOptions, TableFilter, }, search_index_bootstrap::FINISHED_BOOTSTRAP_UPDATES, search_index_workers::{ search_compactor::CompactionConfig, writer::SearchIndexMetadataWriter, FlusherType, }, test_helpers::{ DbFixtures, DbFixturesArgs, }, text_index_worker::{ compactor::new_text_compactor, flusher::new_text_flusher, BuildTextIndexArgs, }, Database, IndexModel, ResolvedQuery, TableModel, TestFacingModel, Transaction, UserFacingModel, }; #[derive(Clone)] struct Scenario { rt: TestRuntime, database: Database<TestRuntime>, search_storage: Arc<dyn Storage>, searcher: Arc<dyn Searcher>, build_index_args: BuildTextIndexArgs, // Add test persistence here, or just change everything to use db fixtures. tp: Arc<dyn Persistence>, table_name: TableName, namespace: TableNamespace, index_id: ResolvedDocumentId, // Store a simple mapping of a test string to an array of test // strings (the search field) and a filter field model: BTreeMap<String, (ResolvedDocumentId, String, String)>, } impl Scenario { async fn new(rt: TestRuntime) -> anyhow::Result<Self> { Self::new_with_searcher(rt.clone(), InProcessSearcher::new(rt).await?).await } async fn new_with_searcher(rt: TestRuntime, searcher: impl Searcher) -> anyhow::Result<Self> { let DbFixtures { db: database, search_storage, searcher, tp, build_index_args, .. } = DbFixtures::new_with_args( &rt, DbFixturesArgs { searcher: Some(Arc::new(searcher)), ..Default::default() }, ) .await?; let table_name: TableName = "test".parse()?; let namespace = TableNamespace::test_user(); let mut tx = database.begin(Identity::system()).await?; TableModel::new(&mut tx) .insert_table_metadata_for_test(TableNamespace::test_user(), &table_name) .await?; let index = IndexMetadata::new_backfilling_text_index( "test.by_text".parse()?, "searchField".parse()?, btreeset! {"filterField".parse()?}, ); let index_id = IndexModel::new(&mut tx) .add_application_index(namespace, index) .await?; database.commit(tx).await?; let mut self_ = Self { rt, database, search_storage, searcher, tp, build_index_args, table_name, namespace, index_id, model: BTreeMap::new(), }; self_.backfill().await?; self_.enable_index().await?; Ok(self_) } async fn set_bootstrapping(&mut self) -> anyhow::Result<()> { let DbFixtures { db, searcher, search_storage, tp, build_index_args, .. } = DbFixtures::new_with_args( &self.rt, DbFixturesArgs { tp: Some(self.tp.clone()), searcher: Some(self.searcher.clone()), search_storage: Some(self.search_storage.clone()), bootstrap_search_and_vector_indexes: false, ..Default::default() }, ) .await?; self.database = db; self.searcher = searcher; self.search_storage = search_storage; self.tp = tp; self.build_index_args = build_index_args; Ok(()) } async fn compact(&mut self) -> anyhow::Result<()> { let writer = SearchIndexMetadataWriter::new( self.rt.clone(), self.database.clone(), self.tp.reader(), self.search_storage.clone(), self.build_index_args.clone(), ); new_text_compactor( self.database.clone(), self.searcher.clone(), self.search_storage.clone(), CompactionConfig::default(), writer, ) .step() .await?; Ok(()) } async fn backfill(&mut self) -> anyhow::Result<()> { let writer = SearchIndexMetadataWriter::new( self.rt.clone(), self.database.clone(), self.tp.reader(), self.search_storage.clone(), self.build_index_args.clone(), ); let flusher = new_text_flusher( self.rt.clone(), self.database.clone(), self.tp.reader(), self.search_storage.clone(), self.build_index_args.segment_term_metadata_fetcher.clone(), writer, FlusherType::Backfill, ); flusher.step().await?; Ok(()) } async fn enable_index(&mut self) -> anyhow::Result<()> { let mut txn = self.database.begin_system().await?; IndexModel::new(&mut txn) .enable_index_for_testing( self.namespace, &IndexName::new("test".parse()?, IndexDescriptor::new("by_text")?)?, ) .await?; self.database.commit(txn).await?; Ok(()) } async fn query_in_tx<S: Into<String>>( &self, tx: &mut Transaction<TestRuntime>, query_string: S, filter: Option<String>, version: SearchVersion, ) -> anyhow::Result<Vec<(ResolvedDocumentId, f64)>> { let mut filters = vec![SearchFilterExpression::Search( "searchField".parse()?, query_string.into(), )]; if let Some(filter_field) = filter { filters.push(SearchFilterExpression::Eq( "filterField".parse()?, Some(filter_field.try_into()?), )); } let search = Search { index_name: "test.by_text".parse()?, table: self.table_name.clone(), filters, }; let query = Query { source: QuerySource::Search(search), operators: vec![QueryOperator::Limit(MAX_CANDIDATE_REVISIONS)], }; let mut query_stream = match version { SearchVersion::V1 => ResolvedQuery::new_bounded( tx, self.namespace, query, PaginationOptions::ManualPagination { start_cursor: None, maximum_rows_read: None, maximum_bytes_read: None, }, None, TableFilter::ExcludePrivateSystemTables, )?, SearchVersion::V2 => ResolvedQuery::new_bounded( tx, self.namespace, query, PaginationOptions::ManualPagination { start_cursor: None, maximum_rows_read: None, maximum_bytes_read: None, }, Some(MIN_NPM_VERSION_FOR_FUZZY_SEARCH.clone()), TableFilter::ExcludePrivateSystemTables, )?, }; let mut returned = Vec::new(); while let Some(value) = query_stream.next(tx, None).await? { must_let!(let Some(cursor) = query_stream.cursor()); must_let!(let CursorPosition::After(index_key) = cursor.position); let reader = &mut &index_key[..]; let index_key_values = bytes_to_values(reader)?; must_let!( let Some(ConvexValue::Float64(negative_score)) = index_key_values[0].clone() ); returned.push((value.id(), -negative_score)) } Ok(returned) } async fn _query_with_scores<S: Into<String>>( &self, query_string: S, filter: Option<String>, ts: Option<Timestamp>, version: SearchVersion, ) -> anyhow::Result<Vec<(ResolvedDocumentId, f64)>> { let ts = ts.unwrap_or_else(|| *self.database.now_ts_for_reads()); let mut tx = self .database .begin_with_ts(Identity::system(), ts, FunctionUsageTracker::new()) .await?; self.query_in_tx(&mut tx, query_string, filter, version) .await } async fn query_with_scores( &self, test_query: &TestQuery, ts: Option<Timestamp>, version: SearchVersion, ) -> anyhow::Result<Vec<(ResolvedDocumentId, f64)>> { let query_string = test_query .search .iter() .map(|key| key.to_string()) .collect::<Vec<_>>() .join(" "); let filter_field = test_query.filter.map(|filter| filter.to_string()); self._query_with_scores(query_string, filter_field, ts, version) .await } // TODO: remove once not needed anymore. I forsee still using this for testing a // bit so not removing for now. #[allow(unused)] async fn query_with_scores_and_verify_version_results_match( &self, test_query: &TestQuery, ts: Option<Timestamp>, ) -> anyhow::Result<Vec<(ResolvedDocumentId, f64)>> { let left = self .query_with_scores(test_query, ts, SearchVersion::V2) .await?; let right = self .query_with_scores(test_query, ts, SearchVersion::V2) .await?; assert_query_results_approx_equal(&left, &right); Ok(left) } // Box the future to avoid stack overflow. fn patch( &mut self, key: TestKey, search_field: Vec<TestValue>, filter_field: TestValue, ) -> BoxFuture<'_, anyhow::Result<Timestamp>> { let future = async move { let text = search_field .iter() .map(|value| value.to_string()) .collect::<Vec<_>>() .join(" "); let filter_field = format!("{filter_field:?}"); let (_, ts) = self._patch(key.to_string(), text, filter_field).await?; Ok(ts) }; future.boxed() } async fn insert<S: Into<String>, F: Into<String>>( &self, search_field: S, filter_field: F, ) -> anyhow::Result<Timestamp> { let search_field = search_field.into(); let filter_field = filter_field.into(); let mut tx = self.database.begin(Identity::system()).await?; TestFacingModel::new(&mut tx) .insert( &self.table_name, assert_obj!("searchField" => search_field, "filterField" => filter_field), ) .await?; self.database.commit(tx).await } async fn _patch<K: Into<String>, S: Into<String>, F: Into<String>>( &mut self, key: K, search_field: S, filter_field: F, ) -> anyhow::Result<(ResolvedDocumentId, Timestamp)> { let key = key.into(); let search_field = search_field.into(); let filter_field = filter_field.into(); let mut tx = self.database.begin(Identity::system()).await?; let new_document = assert_obj!("searchField" => search_field.clone(), "filterField" => filter_field.clone()); let document_id = match self.model.entry(key) { Entry::Vacant(e) => { let document_id = TestFacingModel::new(&mut tx) .insert(&self.table_name, new_document) .await?; e.insert((document_id, search_field, filter_field)); document_id }, Entry::Occupied(mut e) => { let (document_id, ..) = e.get(); UserFacingModel::new_root_for_test(&mut tx) .patch((*document_id).into(), new_document.into()) .await?; e.get_mut().1 = search_field; e.get_mut().2 = filter_field; e.get().0 }, }; let ts = self.database.commit(tx).await?; Ok((document_id, ts)) } async fn execute(&mut self, action: TestAction) -> anyhow::Result<()> { match action { TestAction::Backfill => { self.backfill().await?; }, TestAction::Update(TestUpdate { key, search_field, filter_field, }) => { self.patch(key, search_field, filter_field).await?; }, TestAction::Delete(k) => { if let Some((id, ..)) = self.model.remove(&k.to_string()) { let mut tx = self.database.begin(Identity::system()).await?; UserFacingModel::new_root_for_test(&mut tx) .delete(id.into()) .await?; self.database.commit(tx).await?; } }, TestAction::QueryAndCheckResults(query) => { let query_set: BTreeSet<_> = query .search .clone() .into_iter() .map(|s| s.to_string()) .collect(); // Find all the documents that contain any of the keywords and // match the filter. let expected: BTreeSet<_> = self .model .values() .filter(|(_, values, _)| { values.split(' ').any(|value| query_set.contains(value)) }) .filter(|(_, _, filter_field)| { if let Some(filter) = query.filter { filter_field == &filter.to_string() } else { true } }) .map(|(id, ..)| *id) .collect(); let returned: BTreeSet<_> = self .query_with_scores(&query, None, SearchVersion::V2) .await? .into_iter() .map(|(id, _)| id) .collect(); assert_eq!(expected, returned); }, TestAction::QueryAndCheckScores(query) => { // Get the scores and then do a backfill and retreive them again. // This confirms that they are consistent even when they are computed // from memory or disk. let memory_results = self .query_with_scores(&query, None, SearchVersion::V2) .await?; self.backfill().await?; let disk_results = self .query_with_scores(&query, None, SearchVersion::V2) .await?; assert_query_results_approx_equal(&memory_results, &disk_results); }, TestAction::Compact => { self.compact().await?; }, } self.database.memory_consistency_check()?; Ok(()) } } fn assert_query_results_approx_equal( left: &Vec<(ResolvedDocumentId, f64)>, right: &Vec<(ResolvedDocumentId, f64)>, ) { assert_eq!(left.len(), right.len()); for ((left_id, left_score), (right_id, right_score)) in left.iter().zip(right.iter()) { assert_eq!(left_id, right_id); assert_approx_equal(*left_score, *right_score); } } #[derive(Debug, Arbitrary, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] enum TestKey { A, B, C, D, } impl Display for TestKey { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{self:?}") } } #[derive(Debug, Arbitrary, Copy, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)] enum TestValue { A, B, C, D, } impl Display for TestValue { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{self:?}") } } #[derive(Debug, Arbitrary)] struct TestQuery { #[proptest(strategy = "prop::collection::vec(any::<TestValue>(), 0..4)")] search: Vec<TestValue>, filter: Option<TestValue>, } #[derive(Debug, Arbitrary)] struct TestUpdate { key: TestKey, search_field: Vec<TestValue>, filter_field: TestValue, } #[derive(Debug, Arbitrary)] enum TestAction { Backfill, Update(TestUpdate), Delete(TestKey), QueryAndCheckResults(TestQuery), QueryAndCheckScores(TestQuery), Compact, } fn test_search_actions(actions: Vec<TestAction>) { let td = TestDriver::new(); let rt = td.rt(); let future = async move { let mut scenario = Scenario::new(rt).await?; for action in actions { scenario.execute(action).await?; } anyhow::Ok(()) }; td.run_until(future).unwrap(); } proptest! { #![proptest_config(ProptestConfig { cases: 32 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, .. ProptestConfig::default() })] /// Randomized search test /// /// This does a random sequence of updates, deletes, and backfills and checks: /// 1. That querying returns the right set of results. /// 2. That the scores produced by the in-memory search index match the disk /// index scores. #[test] fn proptest_search_results(actions in prop::collection::vec(any::<TestAction>(), 1..16)) { test_search_actions(actions); } #[test] fn proptest_single_query( updates in prop::collection::vec(any::<TestUpdate>(), 1..8), query in any::<TestQuery>(), ) { let mut actions: Vec<_> = updates.into_iter().map(TestAction::Update).collect(); actions.push(TestAction::QueryAndCheckScores(query)); test_search_actions(actions); } } /// A non-randomized test to check the BM25 score value. /// /// Our randomized tests ensure that the scores from the in-memory index /// match the disk index scores, but don't check the actual score value. /// /// This makes sure that the scores actually match what BM25 is supposed to /// produce. #[convex_macro::test_runtime] async fn test_search_score(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario .patch(TestKey::A, vec![TestValue::A], TestValue::A) .await?; let query = TestQuery { search: vec![TestValue::A], filter: None, }; { let results = scenario .query_with_scores(&query, None, SearchVersion::V1) .await?; assert_eq!(results.len(), 1); assert_approx_equal(results.first().unwrap().1, 0.2876); } { let results = scenario .query_with_scores(&query, None, SearchVersion::V2) .await?; assert_eq!(results.len(), 1); assert_approx_equal(results.first().unwrap().1, 0.2876); } anyhow::Ok(()) } /// A non-randomized test to test querying our in-memory index at historical /// timestamps. /// /// The randomized tests do all the querying at the current time. #[convex_macro::test_runtime] async fn test_historical_timestamps(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; let query = TestQuery { search: vec![TestValue::A], filter: None, }; let ts1 = scenario .patch(TestKey::A, vec![TestValue::A], TestValue::A) .await?; let ts2 = scenario .patch(TestKey::B, vec![TestValue::A], TestValue::A) .await?; let ts3 = scenario .patch(TestKey::C, vec![TestValue::A], TestValue::A) .await?; let ts4 = scenario .patch(TestKey::D, vec![TestValue::A], TestValue::A) .await?; assert_eq!( scenario .query_with_scores(&query, Some(ts1), SearchVersion::V2) .await? .len(), 1 ); assert_eq!( scenario .query_with_scores(&query, Some(ts2), SearchVersion::V2) .await? .len(), 2 ); assert_eq!( scenario .query_with_scores(&query, Some(ts3), SearchVersion::V2) .await? .len(), 3 ); assert_eq!( scenario .query_with_scores(&query, Some(ts4), SearchVersion::V2) .await? .len(), 4 ); anyhow::Ok(()) } /// A test for a query where there is one document in the disk index /// but it was since deleted. #[convex_macro::test_runtime] async fn test_querying_with_zero_documents(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario .patch(TestKey::A, vec![TestValue::A], TestValue::A) .await?; scenario.execute(TestAction::Delete(TestKey::A)).await?; scenario .execute(TestAction::QueryAndCheckScores(TestQuery { search: vec![TestValue::A], filter: None, })) .await?; anyhow::Ok(()) } #[convex_macro::test_runtime] async fn test_filtering_match(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario .patch(TestKey::A, vec![TestValue::A], TestValue::A) .await?; scenario .execute(TestAction::QueryAndCheckScores(TestQuery { search: vec![TestValue::A], filter: Some(TestValue::A), })) .await?; anyhow::Ok(()) } #[convex_macro::test_runtime] async fn test_filtering_no_match(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario .patch(TestKey::A, vec![TestValue::A], TestValue::A) .await?; scenario .execute(TestAction::QueryAndCheckScores(TestQuery { search: vec![TestValue::A], filter: Some(TestValue::B), })) .await?; anyhow::Ok(()) } #[convex_macro::test_runtime] async fn test_filtering_match_deleted(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario .patch(TestKey::A, vec![TestValue::A], TestValue::A) .await?; scenario.execute(TestAction::Delete(TestKey::A)).await?; scenario .execute(TestAction::QueryAndCheckScores(TestQuery { search: vec![TestValue::A], filter: Some(TestValue::A), })) .await?; anyhow::Ok(()) } #[convex_macro::test_runtime] async fn test_filtering_match_updates(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario .patch(TestKey::A, vec![TestValue::A], TestValue::A) .await?; scenario .patch(TestKey::A, vec![TestValue::A], TestValue::B) .await?; scenario .execute(TestAction::QueryAndCheckScores(TestQuery { search: vec![TestValue::A], filter: Some(TestValue::A), })) .await?; anyhow::Ok(()) } #[convex_macro::test_runtime] async fn test_bm25_stats_no_underflow(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario .patch(TestKey::C, vec![TestValue::D], TestValue::A) .await?; scenario.execute(TestAction::Backfill).await?; scenario.execute(TestAction::Delete(TestKey::C)).await?; // This query doens't use the filter field, so the BM25 stats will not include // the filter field while the commit statistics will in the memory index from // the delete. scenario .execute(TestAction::QueryAndCheckScores(TestQuery { search: vec![TestValue::D], filter: None, })) .await?; anyhow::Ok(()) } // Regression test: We had a bug where we were computing the index of a matching // union term incorrectly. // // This test assigns term ID 0 to a filter term (by creating a document with an // empty search field first) and then term ID 1 to a term included in the query. // Then, we perform a matching query, checking that the computed offset for the // union matching term is in bounds. #[test] fn test_union_rank() { let actions = vec![ TestAction::Update(TestUpdate { key: TestKey::A, search_field: vec![], filter_field: TestValue::A, }), TestAction::Update(TestUpdate { key: TestKey::A, search_field: vec![TestValue::B], filter_field: TestValue::A, }), TestAction::QueryAndCheckResults(TestQuery { search: vec![TestValue::B], filter: Some(TestValue::A), }), ]; test_search_actions(actions); } #[convex_macro::test_runtime] async fn test_prefix_mem(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario._patch("a", "the quick brow fox", "test").await?; // Exact match w/o prefix will fail let results = scenario ._query_with_scores("bro test", None, None, SearchVersion::V2) .await?; assert_eq!(results.len(), 0); // Exact match w/ prefix will succeed let results = scenario ._query_with_scores("bro", None, None, SearchVersion::V2) .await?; assert_eq!(results.len(), 1); // Prefix scenario ._patch("b", "this is some aghhhhhhhhh... random article", "test") .await?; let results = scenario ._query_with_scores("aghh", None, None, SearchVersion::V2) .await?; assert_eq!(results.len(), 1); Ok(()) } // Previous regression #[convex_macro::test_runtime] async fn test_fuzzy_disk_snapshot_shortlist_ids_valid_with_empty_memory_index( rt: TestRuntime, ) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; for i in 0..2048 { scenario ._patch(format!("{i:}"), "rakeeb wuz here", "test") .await?; } scenario.backfill().await?; let results = scenario ._query_with_scores("rak", None, None, SearchVersion::V2) .await?; assert_eq!(results.len(), MAX_CANDIDATE_REVISIONS); Ok(()) } // See https://github.com/get-convex/convex/pull/20649 #[convex_macro::test_runtime] async fn unrelated_sentences_are_queryable_after_flush(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario._patch("key1", "rakeeb wuz here", "test").await?; scenario ._patch("key2", "some other sentence", "test") .await?; scenario.backfill().await?; let results = scenario ._query_with_scores("rakeeb", None, None, SearchVersion::V2) .await?; assert_eq!(results.len(), 1); let results = scenario ._query_with_scores("sentenc", None, None, SearchVersion::V2) .await?; assert_eq!(results.len(), 1); Ok(()) } #[test] fn searches_with_duplicate_terms_have_same_memory_disk_score() { let action = TestAction::Update(TestUpdate { key: TestKey::A, search_field: vec![TestValue::D], filter_field: TestValue::A, }); let query = TestAction::QueryAndCheckScores(TestQuery { search: vec![TestValue::D, TestValue::D], filter: None, }); test_search_actions(vec![action, query]); } #[convex_macro::test_runtime] async fn empty_searches_produce_no_results(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario ._patch("key1", "rakeeb \t\nwuz here", "test") .await?; scenario.backfill().await?; scenario ._patch("key2", "rakeeb wuz not here", "test") .await?; for query_string in vec!["", " ", "\n", "\t"] { let results = scenario ._query_with_scores(query_string, None, None, SearchVersion::V2) .await?; assert_eq!(results.len(), 0); } Ok(()) } #[convex_macro::test_runtime] async fn empty_search_works_while_bootstrapping(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario ._patch("key1", "rakeeb \t\nwuz here", "test") .await?; scenario.backfill().await?; scenario ._patch("key2", "rakeeb wuz not here", "test") .await?; scenario.set_bootstrapping().await?; for query_string in vec!["", " ", "\n", "\t"] { let results = scenario ._query_with_scores(query_string, None, None, SearchVersion::V2) .await?; assert_eq!(results.len(), 0); } Ok(()) } #[convex_macro::test_runtime] async fn search_fails_while_bootstrapping(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario ._patch("key1", "rakeeb \t\nwuz here", "test") .await?; scenario.backfill().await?; scenario.set_bootstrapping().await?; let err = scenario ._query_with_scores("rakeeb", None, None, SearchVersion::V2) .await .unwrap_err(); assert!(err.is_overloaded()); Ok(()) } /// Test that search works after bootstrapping has finished when there are /// writes in between bootstrap ts and the commit ts. #[convex_macro::test_runtime] async fn search_works_after_bootstrapping( rt: TestRuntime, pause_controller: PauseController, ) -> anyhow::Result<()> { let scenario = Scenario::new(rt.clone()).await?; let hold_guard = pause_controller.hold(FINISHED_BOOTSTRAP_UPDATES); let mut wait_for_blocked = hold_guard.wait_for_blocked().boxed(); let handle = scenario.database.start_search_and_vector_bootstrap(); let bootstrap_fut = handle.join().fuse(); pin_mut!(bootstrap_fut); select_biased! { _ = bootstrap_fut => { panic!("bootstrap completed before pause");}, pause_guard = wait_for_blocked.as_mut().fuse() => { if let Some(pause_guard) = pause_guard { scenario.insert("rakeeb \t\nwuz here", "test").await?; pause_guard.unpause(); } }, } bootstrap_fut.await?; scenario ._query_with_scores("rakeeb", None, None, SearchVersion::V2) .await?; Ok(()) } #[convex_macro::test_runtime] async fn test_bootstrap_search_index_recompute_pending_writes( rt: TestRuntime, pause: PauseController, ) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt.clone()).await?; scenario.set_bootstrapping().await?; let db = scenario.database.clone(); // Start a new transaction that doesn't have the text indexes in the snapshot // when it is first added to pending_writes. let mut tx_without_indexes = db.begin_system().await?; TestFacingModel::new(&mut tx_without_indexes) .insert( &scenario.table_name, assert_obj!("searchField" => "geoffry", "filterField" => "test"), ) .await?; let db_clone = db.clone(); let commit_fut = async move { db_clone.commit(tx_without_indexes).await?; anyhow::Ok(()) }; let hold_guard = pause.hold(AFTER_PENDING_WRITE_SNAPSHOT); let db_clone = db.clone(); let finish_bootstrap_fut = async move { let pause_guard = hold_guard.wait_for_blocked().await; let handle = db_clone.start_search_and_vector_bootstrap(); handle.join().await?; if let Some(pause_guard) = pause_guard { pause_guard.unpause(); } anyhow::Ok(()) }; futures::try_join!(commit_fut, finish_bootstrap_fut)?; // Start another transaction to verify text indexes were included after // recomputing pending_writes. let result = scenario ._query_with_scores("geoffry", None, None, SearchVersion::V2) .await?; assert_eq!(result.len(), 1); Ok(()) } struct BrokenSearcher; #[async_trait] impl VectorSearcher for BrokenSearcher { async fn execute_multi_segment_vector_query( &self, _: Arc<dyn Storage>, _: Vec<FragmentedVectorSegmentPaths>, _: QdrantSchema, _: CompiledVectorSearch, _: u32, ) -> anyhow::Result<Vec<VectorSearchQueryResult>> { anyhow::bail!("我"); } async fn execute_vector_compaction( &self, _: Arc<dyn Storage>, _: Vec<FragmentedVectorSegmentPaths>, _: usize, ) -> anyhow::Result<FragmentedVectorSegment> { anyhow::bail!("不"); } } #[async_trait] impl Searcher for BrokenSearcher { async fn query_tokens( &self, _: Arc<dyn Storage>, _: FragmentedTextStorageKeys, _: Vec<TokenQuery>, _: usize, ) -> anyhow::Result<Vec<TokenMatch>> { anyhow::bail!("recherche") } async fn query_bm25_stats( &self, _: Arc<dyn Storage>, _: FragmentedTextStorageKeys, _: Vec<Term>, ) -> anyhow::Result<Bm25Stats> { anyhow::bail!("plein") } async fn query_posting_lists( &self, _: Arc<dyn Storage>, _: FragmentedTextStorageKeys, _: PostingListQuery, ) -> anyhow::Result<Vec<PostingListMatch>> { anyhow::bail!("texte"); } async fn execute_text_compaction( &self, _search_storage: Arc<dyn Storage>, _segments: Vec<FragmentedTextStorageKeys>, ) -> anyhow::Result<FragmentedTextSegment> { anyhow::bail!("真不要") } } #[convex_macro::test_runtime] async fn empty_searches_with_broken_searcher_return_empty_results( rt: TestRuntime, ) -> anyhow::Result<()> { let mut scenario = Scenario::new_with_searcher(rt, BrokenSearcher).await?; scenario._patch("key1", "rakeeb wuz here", "test").await?; scenario.backfill().await?; scenario ._patch("key2", "rakeeb wuz not here", "test") .await?; for query_string in vec!["", " ", "\n", "\t"] { let results = scenario ._query_with_scores(query_string, None, None, SearchVersion::V2) .await?; assert_eq!(results.len(), 0); } Ok(()) } #[test] fn test_scores_when_some_but_not_all_term_values_for_a_field_are_deleted() -> anyhow::Result<()> { // This is taken directly from a proptest failure. We could simplify it further, // but it catches a reasonably small error as is. The main thing to note is // that the first Update action adds Document A, which is then replaced by // the second update action, changing the term frequency. If the test fails, // it means the memory and disk are index are not keeping the same statistics // during the replace operation, or are not scoring the same based on the same // statistics. let actions = vec![ // 9 As, 2 Ds TestAction::Update(TestUpdate { key: TestKey::A, search_field: vec![ TestValue::A, TestValue::A, TestValue::A, TestValue::A, TestValue::A, TestValue::A, TestValue::D, TestValue::A, TestValue::D, TestValue::A, TestValue::A, ], filter_field: TestValue::A, }), TestAction::QueryAndCheckScores(TestQuery { search: vec![], filter: None, }), // 4 As, 4 Cs TestAction::Update(TestUpdate { key: TestKey::A, search_field: vec![ TestValue::A, TestValue::C, TestValue::A, TestValue::C, TestValue::C, TestValue::A, TestValue::A, TestValue::C, ], filter_field: TestValue::A, }), // 6 As, 3 Bs, 2Cs, 8Ds TestAction::Update(TestUpdate { key: TestKey::B, search_field: vec![ TestValue::C, TestValue::D, TestValue::D, TestValue::D, TestValue::C, TestValue::A, TestValue::A, TestValue::B, TestValue::D, TestValue::D, TestValue::C, TestValue::D, TestValue::D, TestValue::A, TestValue::A, TestValue::A, TestValue::B, TestValue::B, TestValue::D, TestValue::A, TestValue::B, ], filter_field: TestValue::C, }), TestAction::QueryAndCheckScores(TestQuery { search: vec![TestValue::C], filter: Some(TestValue::C), }), ]; test_search_actions(actions); Ok(()) } /// Generates ASCII alphanumeric strings of length 1..32 fn tokenizable_string_strategy() -> impl Strategy<Value = String> { prop::string::string_regex("[a-zA-Z0-9]{1,32}").unwrap() } fn generate_perturbed_query(query: Vec<String>) -> impl Strategy<Value = Vec<String>> { Just(query).prop_shuffle() } fn generate_document( query: Vec<String>, fluff: Range<usize>, ) -> impl Strategy<Value = Vec<String>> { ( prop::collection::vec(tokenizable_string_strategy(), fluff), generate_perturbed_query(query), ) .prop_map(|(fluff, perturbed_query)| { let mut res = Vec::with_capacity(perturbed_query.len() * fluff.len()); if let Some(q) = perturbed_query.first() { res.push(q.clone()); } for q in perturbed_query.into_iter().skip(1) { res.extend_from_slice(&fluff); res.push(q.clone()); } res }) } #[derive(Debug, Clone)] struct FuzzyDeterminismTestCase { // Let N be the length of `query` // Each document is a list of N-1 sets of fluff terms, a permutation of `query`, a list of // operations to apply to the query terms. documents: Vec<Vec<String>>, query: Vec<String>, } struct FuzzyDeterminismArbitraryParams { query_size: Range<usize>, num_docs: Range<usize>, fluff_len: Range<usize>, } impl Default for FuzzyDeterminismArbitraryParams { fn default() -> Self { Self { query_size: 2..8, // TODO: Since we don't have a deterministic order when querying within a Tantivy // segment, we have to query less than 1024 results here. num_docs: 2..1023, fluff_len: 0..16, } } } impl Arbitrary for FuzzyDeterminismTestCase { type Parameters = FuzzyDeterminismArbitraryParams; type Strategy = impl Strategy<Value = FuzzyDeterminismTestCase>; fn arbitrary_with( FuzzyDeterminismArbitraryParams { query_size, num_docs, fluff_len, }: Self::Parameters, ) -> Self::Strategy { // First, generate a query prop::collection::vec(tokenizable_string_strategy(), query_size) .prop_flat_map(move |query| { // Generate up to num_docs documents ( Just(query.clone()), prop::collection::vec( generate_document(query, fluff_len.clone()), num_docs.clone(), ), ) }) .prop_map(|(query, documents)| FuzzyDeterminismTestCase { documents, query }) .no_shrink() } } async fn do_search_with_backfill_split( mut scenario: Scenario, test_case: FuzzyDeterminismTestCase, split: usize, ) -> anyhow::Result<Vec<(usize, f64)>> { let mut index_by_document_id = BTreeMap::new(); for (i, doc) in test_case.documents.into_iter().enumerate() { if i == split { scenario.backfill().await?; } let key = format!("{i}"); let doc = doc.join(" "); let (document_id, _) = scenario._patch(key, doc, "test").await?; index_by_document_id.insert(document_id, i); } let query = test_case.query.join(" "); let results = scenario ._query_with_scores(query, None, None, SearchVersion::V2) .await? .into_iter() .map(|(document_id, score)| (index_by_document_id[&document_id], score)) .collect(); Ok(results) } fn do_search_for_fraction(test_case: FuzzyDeterminismTestCase, num_splits: usize) { let td = TestDriver::new(); let rt = td.rt(); let future = async move { let mut last_result: Option<BTreeSet<(usize, TotalOrdF64)>> = None; for split in 0..num_splits { let split_idx = (test_case.documents.len() * split) / num_splits; let scenario = Scenario::new(rt.clone()).await?; let result = do_search_with_backfill_split(scenario, test_case.clone(), split_idx) .await? .into_iter() .map(|(i, score)| (i, TotalOrdF64::from(score))) .collect::<BTreeSet<_>>(); if let Some(last_result) = last_result { if result != last_result { let mut msg = format!( "Results differ when doing {} vs. {} splits:", num_splits - 1, num_splits ); for added in result.difference(&last_result) { msg.push_str(&format!("\n added: {added:?}")); } for removed in last_result.difference(&result) { msg.push_str(&format!("\n removed: {removed:?}")); } panic!("{msg}"); } } last_result = Some(result); } anyhow::Ok(()) }; td.run_until(future).unwrap(); } proptest! { // Increase number of cases being run with CONVEX_PROPTEST_MULTIPLIER to test query determinism // more rigorously. #![proptest_config(ProptestConfig { cases: env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, .. ProptestConfig::default() })] #[test] fn proptest_mem_disk_determinism( test_case in any::<FuzzyDeterminismTestCase>(), num_splits in 2_usize..=4_usize, ) { do_search_for_fraction(test_case, num_splits); } } #[convex_macro::test_runtime] async fn test_flushing_does_not_invalidate_subscriptions(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt).await?; scenario.insert("existing text", "a").await?; scenario.backfill().await?; let mut tx = scenario.database.begin_system().await?; let results = scenario .query_in_tx(&mut tx, "existing", None, SearchVersion::V2) .await?; assert_eq!(results.len(), 1); let token = tx.into_token()?; // Force a checkpoint by advancing time scenario .rt .advance_time(*DATABASE_WORKERS_MAX_CHECKPOINT_AGE * 2) .await; scenario.insert("new text", "b").await?; scenario.backfill().await?; // Our old subscription token should not be invalidated let ts = *scenario.database.now_ts_for_reads(); assert!(scenario .database .refresh_token(token.clone(), ts) .await? .is_ok()); // TODO(ENG-9324): deleting the index *should* invalidate the transaction, but // it currently does not. let mut tx = scenario.database.begin_system().await?; IndexModel::new(&mut tx) .drop_index(scenario.index_id) .await?; scenario.database.commit(tx).await?; let ts = *scenario.database.now_ts_for_reads(); // this *should* return None, but for now it doesn't. assert!(scenario.database.refresh_token(token, ts).await?.is_ok()); Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server