Skip to main content
Glama

Convex MCP server

Official
by get-convex
vector_tests.rs33.5 kB
use std::{ collections::{ BTreeMap, BTreeSet, }, sync::Arc, }; use cmd_util::env::env_config; use common::{ bootstrap_model::index::{ vector_index::{ VectorIndexBackfillState, VectorIndexSnapshot, VectorIndexSnapshotData, VectorIndexSpec, VectorIndexState, }, IndexConfig, IndexMetadata, }, components::ComponentId, knobs::{ MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, VECTOR_INDEX_SIZE_SOFT_LIMIT, }, persistence::PersistenceReader, runtime::Runtime, types::{ unchecked_repeatable_ts, IndexDescriptor, IndexName, }, }; use itertools::Itertools; use keybroker::Identity; use maplit::{ btreemap, btreeset, }; use must_let::must_let; use proptest::prelude::*; use proptest_derive::Arbitrary; use qdrant_segment::types::VECTOR_ELEMENT_SIZE; use runtime::{ prod::ProdRuntime, testing::{ TestDriver, TestRuntime, }, }; use search::searcher::{ InProcessSearcher, Searcher, }; use storage::Storage; use value::{ assert_obj, ConvexObject, ConvexValue, DeveloperDocumentId, TableName, TableNamespace, }; use vector::{ cosine_similarity, PublicVectorSearchQueryResult, VectorSearch, VectorSearchExpression, }; use crate::{ search_index_workers::FlusherType, test_helpers::{ vector_utils::{ random_vector, random_vector_value, vector_to_value, DIMENSIONS, }, DbFixtures, DbFixturesArgs, }, tests::vector_test_utils::{ IndexData, VectorFixtures, }, vector_index_worker::{ compactor::compact_vector_indexes_in_test, flusher::{ backfill_vector_indexes, new_vector_flusher_for_tests, }, }, Database, IndexModel, TableModel, UserFacingModel, VectorIndexFlusher, }; const TABLE_NAME: &str = "test"; const INDEX_DESCRIPTOR: &str = "by_embedding"; const INDEX_NAME: &str = "test.by_embedding"; const INDEXED_FIELD: &str = "embedding"; const FILTER_FIELDS: &[&str] = &["A", "B", "C", "D"]; const TABLE_NAMESPACE: TableNamespace = TableNamespace::test_user(); struct Scenario<RT: Runtime> { rt: RT, database: Database<RT>, reader: Arc<dyn PersistenceReader>, search_storage: Arc<dyn Storage>, searcher: Arc<dyn Searcher>, } impl<RT: Runtime> Scenario<RT> { async fn new(rt: RT) -> anyhow::Result<Self> { let DbFixtures { tp, db, searcher, search_storage, .. } = DbFixtures::new_with_args( &rt, DbFixturesArgs { searcher: Some(Arc::new(InProcessSearcher::new(rt.clone()).await?)), ..Default::default() }, ) .await?; let handle = db.start_search_and_vector_bootstrap(); handle.join().await?; let self_ = Self { rt, database: db, reader: tp.reader(), search_storage, searcher, }; Ok(self_) } async fn new_with_enabled_index(rt: RT) -> anyhow::Result<Self> { let self_ = Scenario::new(rt).await?; self_.add_vector_index(true).await?; Ok(self_) } fn new_backfill_flusher(&self, incremental_index_size: usize) -> VectorIndexFlusher<RT> { new_vector_flusher_for_tests( self.rt.clone(), self.database.clone(), self.reader.clone(), self.search_storage.clone(), *VECTOR_INDEX_SIZE_SOFT_LIMIT, *MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, incremental_index_size, FlusherType::Backfill, ) } async fn add_vector_index(&self, should_backfill: bool) -> anyhow::Result<()> { let table_name: TableName = TABLE_NAME.parse()?; let mut tx = self.database.begin(Identity::system()).await?; let namespace = TableNamespace::test_user(); TableModel::new(&mut tx) .insert_table_metadata_for_test(namespace, &table_name) .await?; let index = IndexMetadata::new_backfilling_vector_index( INDEX_NAME.parse()?, INDEXED_FIELD.parse()?, DIMENSIONS.try_into()?, FILTER_FIELDS.iter().map(|f| f.parse()).try_collect()?, ); IndexModel::new(&mut tx) .add_application_index(namespace, index) .await?; self.database.commit(tx).await?; if should_backfill { self.backfill().await?; self.enable_index().await?; } Ok(()) } async fn enable_index(&self) -> anyhow::Result<()> { let mut tx = self.database.begin_system().await?; IndexModel::new(&mut tx) .enable_index_for_testing( TABLE_NAMESPACE, &IndexName::new(TABLE_NAME.parse()?, IndexDescriptor::new(INDEX_DESCRIPTOR)?)?, ) .await?; self.database.commit(tx).await?; Ok(()) } async fn seed_table_with_vector_data( &self, num_docs: u32, ) -> anyhow::Result<Vec<DeveloperDocumentId>> { let mut ids = vec![]; for _ in 0..num_docs { let mut tx = self.database.begin(Identity::system()).await?; let vector = random_vector_value(&mut self.rt.rng()); let obj = assert_obj!(INDEXED_FIELD => vector, "A" => ConvexValue::Int64(1017)); let id = UserFacingModel::new_root_for_test(&mut tx) .insert(TABLE_NAME.parse()?, obj) .await?; ids.push(id); self.database.commit(tx).await?; } Ok(ids) } async fn compact(&mut self) -> anyhow::Result<()> { compact_vector_indexes_in_test( self.rt.clone(), self.database.clone(), self.reader.clone(), self.search_storage.clone(), self.searcher.clone(), ) .await } async fn backfill(&self) -> anyhow::Result<()> { backfill_vector_indexes( self.rt.clone(), self.database.clone(), self.reader.clone(), self.search_storage.clone(), ) .await?; Ok(()) } async fn search( &self, vector: Vec<f32>, filter_expressions: BTreeSet<VectorSearchExpression>, ) -> anyhow::Result<Vec<PublicVectorSearchQueryResult>> { self.search_with_limit(vector, filter_expressions, None) .await } async fn search_with_limit( &self, vector: Vec<f32>, filter_expressions: BTreeSet<VectorSearchExpression>, limit: Option<u32>, ) -> anyhow::Result<Vec<PublicVectorSearchQueryResult>> { let (results, _usage_stats) = self .database .vector_search( Identity::system(), VectorSearch { index_name: INDEX_NAME.parse()?, component_id: ComponentId::Root, vector, limit, expressions: filter_expressions, }, ) .await?; Ok(results) } pub async fn get_vector_index_configs( &self, ) -> anyhow::Result<Vec<(VectorIndexSpec, VectorIndexState)>> { let mut tx = self.database.begin_system().await?; let mut model = IndexModel::new(&mut tx); Ok(model .get_all_indexes() .await? .into_iter() .filter_map(|idx| { if let IndexConfig::Vector { spec, on_disk_state, } = idx.config.clone() { Some((spec, on_disk_state)) } else { None } }) .collect_vec()) } } #[derive(Debug, Arbitrary, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] struct TestKey { // At most 8 vectors in our test. #[proptest(strategy = "0..8u32")] number: u32, } #[derive(Debug, Arbitrary, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] enum FilterKey { A, B, C, D, } #[derive(Debug, Arbitrary, Copy, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)] enum FilterValue { A, B, C, D, } fn vector_strategy() -> impl Strategy<Value = Vec<f32>> { // Uniformly sample points in [-1, 1]^4, excluding a small neighborhood of the // origin, and normalize to a unit vector. let range = prop_oneof![-1.0f32..-1e-4, 1e-4..1.0f32]; prop::collection::vec(range, DIMENSIONS as usize).prop_map(|v| { let norm = v.iter().map(|x| x * x).sum::<f32>().sqrt(); v.into_iter().map(|x| x / norm).collect() }) } #[derive(Debug, Arbitrary)] struct TestUpdate { key: TestKey, #[proptest(strategy = "vector_strategy()")] vector: Vec<f32>, #[proptest( strategy = "prop::collection::btree_map(any::<FilterKey>(), any::<FilterValue>(), 0..=4)" )] filter_values: BTreeMap<FilterKey, FilterValue>, } #[derive(Debug, Arbitrary)] struct TestQuery { #[proptest(strategy = "vector_strategy()")] vector: Vec<f32>, #[proptest(strategy = "prop::collection::btree_map(any::<FilterKey>(), \ prop::collection::btree_set(any::<Option<FilterValue>>(), 1..=4), \ 1..=4)")] filter: BTreeMap<FilterKey, BTreeSet<Option<FilterValue>>>, #[proptest(strategy = "1..16u32")] limit: u32, } #[derive(Debug, Arbitrary)] enum TestAction { Backfill, Update(TestUpdate), Delete(TestKey), QueryAndCheckResults(TestQuery), } struct RandomizedTest<RT: Runtime> { scenario: Scenario<RT>, model: BTreeMap<TestKey, (DeveloperDocumentId, TestUpdate)>, } impl<RT: Runtime> RandomizedTest<RT> { async fn new(rt: RT) -> anyhow::Result<Self> { Ok(Self { scenario: Scenario::new_with_enabled_index(rt).await?, model: BTreeMap::new(), }) } async fn execute(&mut self, action: TestAction) -> anyhow::Result<()> { match action { TestAction::Backfill => self.scenario.backfill().await?, TestAction::Update(update) => { let mut tx = self.scenario.database.begin_system().await?; let mut new_obj = BTreeMap::new(); new_obj.insert( INDEXED_FIELD.parse()?, vector_to_value(update.vector.clone()), ); for (key, value) in &update.filter_values { new_obj.insert( format!("{key:?}").parse()?, ConvexValue::String(format!("{value:?}").try_into()?), ); } let new_obj = ConvexObject::try_from(new_obj)?; let id = if let Some((document_id, _)) = self.model.remove(&update.key) { UserFacingModel::new_root_for_test(&mut tx) .replace(document_id, new_obj) .await?; document_id } else { UserFacingModel::new_root_for_test(&mut tx) .insert(TABLE_NAME.parse()?, new_obj) .await? }; self.scenario.database.commit(tx).await?; self.model.insert(update.key, (id, update)); }, TestAction::Delete(key) => { if let Some((document_id, _)) = self.model.remove(&key) { let mut tx = self.scenario.database.begin_system().await?; UserFacingModel::new_root_for_test(&mut tx) .delete(document_id) .await?; self.scenario.database.commit(tx).await?; } }, TestAction::QueryAndCheckResults(test_query) => { let mut expressions = btreeset![]; for (key, maybe_values) in test_query.filter.clone() { let values: BTreeSet<_> = maybe_values .into_iter() .map(|value| { let value = value .map(|value| { anyhow::Ok(ConvexValue::String( format!("{value:?}").try_into()?, )) }) .transpose()?; anyhow::Ok(value) }) .try_collect()?; let expression = if values.len() == 1 { let value = values.into_iter().next().unwrap(); VectorSearchExpression::Eq(format!("{key:?}").parse()?, value) } else { VectorSearchExpression::In(format!("{key:?}").parse()?, values) }; expressions.insert(expression); } let query = VectorSearch { index_name: INDEX_NAME.parse()?, component_id: ComponentId::Root, vector: test_query.vector.clone(), limit: Some(test_query.limit), expressions, }; let (returned_results, _usage_stats) = self .scenario .database .vector_search(Identity::system(), query) .await?; let mut expected_results = vec![]; for (id, update) in self.model.values() { let mut matches_filter = false; for (key, maybe_values) in &test_query.filter { if maybe_values .iter() .any(|value| update.filter_values.get(key) == value.as_ref()) { matches_filter = true; break; } } if !matches_filter { continue; } let score = cosine_similarity(&test_query.vector, &update.vector); expected_results.push(PublicVectorSearchQueryResult { id: *id, score }); } expected_results.sort_by(|a, b| a.cmp(b).reverse()); expected_results.truncate(test_query.limit as usize); assert_eq!(returned_results, expected_results); }, } Ok(()) } } #[convex_macro::test_runtime] async fn test_vector_search(rt: TestRuntime) -> anyhow::Result<()> { let scenario = Scenario::new_with_enabled_index(rt.clone()).await?; let mut tx = scenario.database.begin(Identity::system()).await?; let vector1 = random_vector_value(&mut rt.rng()); let obj = assert_obj!(INDEXED_FIELD => vector1, "A" => ConvexValue::Int64(1017)); let id1 = UserFacingModel::new_root_for_test(&mut tx) .insert(TABLE_NAME.parse()?, obj) .await?; let vector2 = random_vector_value(&mut rt.rng()); let obj = assert_obj!(INDEXED_FIELD => vector2); let id2 = UserFacingModel::new_root_for_test(&mut tx) .insert(TABLE_NAME.parse()?, obj.clone()) .await?; scenario.database.commit(tx).await?; for _ in 0..2 { // Check that no filter returns both results. let results = scenario.search(vec![0.; 4], btreeset![]).await?; assert_eq!(results.len(), 2); // Check that filtering for just 1017 gets only the first vector. let match_first = VectorSearchExpression::Eq("A".parse()?, Some(ConvexValue::Int64(1017))); let results = scenario.search(vec![0.; 4], btreeset![match_first]).await?; assert_eq!(results.len(), 1); assert_eq!(results[0].id.internal_id(), id1.internal_id()); // Check that filtering for a nonexistent field only gets the second vector. let match_second = VectorSearchExpression::Eq("A".parse()?, None); let results = scenario .search(vec![0.; 4], btreeset![match_second]) .await?; assert_eq!(results[0].id.internal_id(), id2.internal_id()); // Check that filtering for a field in neither vector returns zero results. let match_neither = VectorSearchExpression::Eq("A".parse()?, Some(ConvexValue::Int64(1018))); let results = scenario .search(vec![0.; 4], btreeset![match_neither]) .await?; assert_eq!(results.len(), 0); // Backfill and repeat once to check the disk index. scenario.backfill().await?; } // Test deleting the first vector and checking that it doesn't show up in // results. let mut tx = scenario.database.begin(Identity::system()).await?; UserFacingModel::new_root_for_test(&mut tx) .delete(id1) .await?; scenario.database.commit(tx).await?; for _ in 0..2 { let results = scenario.search(vec![0.; 4], btreeset![]).await?; assert_eq!(results.len(), 1); assert_eq!(results[0].id.internal_id(), id2.internal_id()); scenario.backfill().await?; } // Test updating the second vector and checking that it shows up once. let mut tx = scenario.database.begin(Identity::system()).await?; UserFacingModel::new_root_for_test(&mut tx) .replace(id2, obj.clone()) .await?; scenario.database.commit(tx).await?; for _ in 0..2 { let results = scenario.search(vec![0.; 4], btreeset![]).await?; assert_eq!(results.len(), 1); assert_eq!(results[0].id.internal_id(), id2.internal_id()); scenario.backfill().await?; } // Test updating the second vector a second time and checking that it still // shows up once. let mut tx = scenario.database.begin(Identity::system()).await?; UserFacingModel::new_root_for_test(&mut tx) .replace(id2, obj) .await?; scenario.database.commit(tx).await?; for _ in 0..2 { let results = scenario.search(vec![0.; 4], btreeset![]).await?; assert_eq!(results.len(), 1); assert_eq!(results[0].id.internal_id(), id2.internal_id()); scenario.backfill().await?; } Ok(()) } #[convex_macro::test_runtime] async fn test_vector_search_compaction(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new_with_enabled_index(rt.clone()).await?; let mut ids = vec![]; let mut rng = rt.rng(); // Then compact the new round of segments and the original one into a single // large segment. for _ in 0..3 { // Create 3 segments and compact them into one segment for _ in 0..3 { let mut tx = scenario.database.begin(Identity::system()).await?; let vector = random_vector_value(&mut rng); let obj = assert_obj!(INDEXED_FIELD => vector, "A" => ConvexValue::Int64(1017)); let id = UserFacingModel::new_root_for_test(&mut tx) .insert(TABLE_NAME.parse()?, obj) .await?; ids.push(id); scenario.database.commit(tx).await?; // Backfill to create a new segment scenario.backfill().await?; } scenario.compact().await?; let results = scenario.search(vec![0.; 4], btreeset![]).await?; assert_eq!( results .into_iter() .map(|result| result.id.internal_id()) .sorted() .collect::<Vec<_>>(), ids.iter() .map(|id| id.internal_id()) .sorted() .collect::<Vec<_>>() ); } scenario.compact().await?; let results = scenario.search(vec![0.; 4], btreeset![]).await?; assert_eq!( results .into_iter() .map(|result| result.id.internal_id()) .sorted() .collect::<Vec<_>>(), ids.iter() .map(|id| id.internal_id()) .sorted() .collect::<Vec<_>>() ); Ok(()) } /// This test will fail flakily if we do not handle MVCC correctly on /// searchlight. That's reasonably likely because we're downloading caching and /// re-using some immutable files across different versions of indexes. #[ignore] // TODO(CX-5143): Re-enable this test after fixing the flake. #[convex_macro::prod_rt_test] async fn test_concurrent_index_version_searches(rt: ProdRuntime) -> anyhow::Result<()> { let scenario = Arc::new(Scenario::new_with_enabled_index(rt.clone()).await?); let mut ids = vec![]; let mut tx = scenario.database.begin(Identity::system()).await?; // Create a segment with N vectors for _ in 0..4 { let vector = random_vector_value(&mut rt.rng()); let obj = assert_obj!(INDEXED_FIELD => vector, "A" => ConvexValue::Int64(1017)); let id = UserFacingModel::new_root_for_test(&mut tx) .insert(TABLE_NAME.parse()?, obj) .await?; ids.push(id); } scenario.database.commit(tx).await?; scenario.backfill().await?; // Create N different versions of the index metadata where segment is // unmodified, but each one has a different deleted bitset. let mut timestamps_and_results = vec![]; for (index, id) in ids.iter().rev().enumerate() { let mut tx = scenario.database.begin(Identity::system()).await?; UserFacingModel::new_root_for_test(&mut tx) .delete(*id) .await?; let timestamp = scenario.database.commit(tx).await?; timestamps_and_results.push(( timestamp, ids.clone() .into_iter() .take(ids.len() - 1 - index) .collect::<Vec<_>>(), )); scenario.backfill().await?; } // Query all of the different versions of the index concurrently to make // sure we can handle loading and searching a single segment file with // different bitsets. let mut handles = vec![]; for (timestamp, expected_results) in timestamps_and_results { let scenario = scenario.clone(); handles.push(rt.spawn_thread("vector", move || async move { let (actual_results, _usage_stats) = scenario .database .vector_search_at_ts( VectorSearch { index_name: INDEX_NAME.parse().unwrap(), component_id: ComponentId::Root, limit: Some(10), vector: vec![0.; 4], expressions: btreeset![], }, unchecked_repeatable_ts(timestamp), ) .await .unwrap(); assert_eq!( actual_results .into_iter() .map(|result| result.id.internal_id()) .sorted() .collect_vec(), expected_results .into_iter() .map(|id| id.internal_id()) .sorted() .collect_vec() ); })) } for handle in handles { handle.join().await?; } Ok(()) } #[convex_macro::test_runtime] async fn test_vector_search_compaction_with_deletes(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new_with_enabled_index(rt.clone()).await?; let mut ids = vec![]; // Create 3 segments and compact them into one segment for _ in 0..3 { let mut tx = scenario.database.begin(Identity::system()).await?; let vector = random_vector_value(&mut rt.rng()); let obj = assert_obj!(INDEXED_FIELD => vector, "A" => ConvexValue::Int64(1017)); let id = UserFacingModel::new_root_for_test(&mut tx) .insert(TABLE_NAME.parse()?, obj) .await?; ids.push(id); scenario.database.commit(tx).await?; // Backfill to create a new segment scenario.backfill().await?; } // Then delete some ids and compact. let ids_to_delete = ids[0..ids.len() / 2].to_vec(); let mut tx = scenario.database.begin(Identity::system()).await?; for id in &ids_to_delete { UserFacingModel::new_root_for_test(&mut tx) .delete(*id) .await?; } scenario.database.commit(tx).await?; scenario.compact().await?; let results = scenario.search(vec![0.; 4], btreeset![]).await?; assert_eq!( results .into_iter() .map(|result| result.id.internal_id()) .sorted() .collect::<Vec<_>>(), ids.iter() .filter(|id| !ids_to_delete.contains(id)) .map(|id| id.internal_id()) .sorted() .collect::<Vec<_>>() ); Ok(()) } #[convex_macro::test_runtime] async fn test_index_backfill_is_incremental(rt: TestRuntime) -> anyhow::Result<()> { let scenario = Scenario::new(rt.clone()).await?; let num_parts = 12; let vectors_per_part = 8; let incremental_index_size = (DIMENSIONS * (VECTOR_ELEMENT_SIZE as u32) * vectors_per_part) as usize; let ids = scenario .seed_table_with_vector_data(num_parts * vectors_per_part) .await?; scenario.add_vector_index(false).await?; let flusher = scenario.new_backfill_flusher(incremental_index_size); let mut backfill_ts = None; for i in 0..num_parts { // Do a backfill iteration flusher.step().await?; // Fetch the current index metadata let mut vec_indexes = scenario.get_vector_index_configs().await?; assert_eq!(vec_indexes.len(), 1); let (_, on_disk_state) = vec_indexes.remove(0); // Verify that on_disk_state remains in backfilling until last iteration // and each iteration adds a new segment. if i < num_parts - 1 { must_let!(let VectorIndexState::Backfilling( VectorIndexBackfillState { segments, backfill_snapshot_ts, .. }) = on_disk_state); assert_eq!(segments.len(), (i + 1) as usize); backfill_ts = backfill_snapshot_ts; } else { must_let!(let VectorIndexState::Backfilled { snapshot: VectorIndexSnapshot { data, ts, }, .. } = on_disk_state); // Verify snapshot timestamp matches backfill timestamp assert_eq!(backfill_ts.unwrap(), ts); must_let!(let VectorIndexSnapshotData::MultiSegment(segments) = data); assert_eq!(segments.len(), (num_parts) as usize); } } // Enable the index scenario.enable_index().await?; // Verify that IDs match let results = scenario .search_with_limit(vec![0.; 4], btreeset![], Some(256)) .await?; let left = results .into_iter() .map(|result| result.id.internal_id()) .sorted() .collect::<Vec<_>>(); let right = ids .iter() .map(|id| id.internal_id()) .sorted() .collect::<Vec<_>>(); assert_eq!(left, right); Ok(()) } #[convex_macro::test_runtime] async fn test_incremental_backfill_with_compaction(rt: TestRuntime) -> anyhow::Result<()> { let mut scenario = Scenario::new(rt.clone()).await?; let num_parts = 3; let vectors_per_part = 8; let incremental_index_size = (DIMENSIONS * (VECTOR_ELEMENT_SIZE as u32) * vectors_per_part) as usize; let ids = scenario .seed_table_with_vector_data(num_parts * vectors_per_part) .await?; scenario.add_vector_index(false).await?; let flusher = scenario.new_backfill_flusher(incremental_index_size); for _ in 0..num_parts { // Do a backfill iteration flusher.step().await?; } scenario.compact().await?; // There should be 2 parts after compaction let mut vec_indexes = scenario.get_vector_index_configs().await?; assert_eq!(vec_indexes.len(), 1); let (_, on_disk_state) = vec_indexes.remove(0); must_let!(let VectorIndexState::Backfilled { snapshot: VectorIndexSnapshot { data: VectorIndexSnapshotData::MultiSegment(segments), .. }, .. } = on_disk_state); assert_eq!(segments.len(), 1); // Enable the index scenario.enable_index().await?; // Verify that IDs match let results = scenario .search_with_limit(vec![0.; 4], btreeset![], Some(256)) .await?; let left = results .into_iter() .map(|result| result.id.internal_id()) .sorted() .collect::<Vec<_>>(); let right = ids .iter() .map(|id| id.internal_id()) .sorted() .collect::<Vec<_>>(); assert_eq!(left, right); Ok(()) } #[convex_macro::test_runtime] async fn test_empty_multi_segment(rt: TestRuntime) -> anyhow::Result<()> { let scenario = Scenario::new_with_enabled_index(rt.clone()).await?; let query = random_vector(&mut rt.rng()); let results = scenario .search_with_limit(query, btreeset![], Some(10)) .await?; assert!(results.is_empty()); Ok(()) } #[convex_macro::test_runtime] async fn test_recall_multi_segment(rt: TestRuntime) -> anyhow::Result<()> { let scenario = Scenario::new_with_enabled_index(rt.clone()).await?; let mut tx = scenario.database.begin(Identity::system()).await?; let table_number = tx .table_mapping() .namespace(TABLE_NAMESPACE) .name_to_number_user_input()(TABLE_NAME.parse()?)?; let mut rng = rt.rng(); let mut by_id = BTreeMap::new(); for _ in 0..100 { let vector = random_vector(&mut rng); let obj = assert_obj!(INDEXED_FIELD => vector_to_value(vector.clone())); let id = UserFacingModel::new_root_for_test(&mut tx) .insert(TABLE_NAME.parse()?, obj) .await?; by_id.insert(id.internal_id(), vector); } scenario.database.commit(tx).await?; let limit = 10u32; let query = random_vector(&mut rng); let mut expected: Vec<_> = by_id .iter() .map(|(id, vector)| PublicVectorSearchQueryResult { id: DeveloperDocumentId::new(table_number, *id), score: cosine_similarity(&query, vector), }) .collect(); expected.sort_by(|a, b| a.cmp(b).reverse()); expected.truncate(limit as usize); for _ in 0..2 { let results = scenario .search_with_limit(query.clone(), btreeset![], Some(limit)) .await?; assert_eq!(results, expected); scenario.backfill().await?; } Ok(()) } proptest! { #![proptest_config(ProptestConfig { cases: 32 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1), failure_persistence: None, .. ProptestConfig::default() })] #[ignore] #[test] fn proptest_vector_search_results( actions in prop::collection::vec(any::<TestAction>(), 1..16), ) { let td = TestDriver::new(); let rt = td.rt(); let future = async move { let mut test = RandomizedTest::new(rt).await?; for action in actions { test.execute(action).await?; } anyhow::Ok(()) }; td.run_until(future).unwrap(); } } #[convex_macro::test_runtime] async fn test_multi_segment_search_obeys_sorted_order(rt: TestRuntime) -> anyhow::Result<()> { let fixtures = VectorFixtures::new(rt.clone()).await?; let IndexData { index_name, resolved_index_name, .. } = fixtures.enabled_vector_index().await?; let vectors = [[3f64, 4f64], [5f64, 6f64], [6f64, 7f64]]; let mut ids = vec![]; for vector in vectors { let id = fixtures .add_document_vec_array(index_name.table(), vector) .await?; ids.push(id); let worker = fixtures.new_index_flusher_with_full_scan_threshold(0, FlusherType::LiveFlush)?; let (metrics, _) = worker.step().await?; assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 1}); } let (results, _usage_stats) = fixtures .db .vector_search( Identity::system(), VectorSearch { index_name: index_name.clone(), component_id: ComponentId::Root, vector: [6f64, 7f64].into_iter().map(|value| value as f32).collect(), limit: Some(3), expressions: btreeset![], }, ) .await?; // Result IDs should match IDs in **reverse order** since input vector // corresponds to [6, 7] or ids[2] assert_eq!( results .into_iter() .map(|result| result.id.internal_id()) .collect_vec(), ids.into_iter() .rev() .map(|id| id.internal_id()) .collect_vec(), ); Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server