Skip to main content
Glama

Convex MCP server

Official
by get-convex
indexes.rs10.9 kB
use std::{ collections::{ BTreeMap, BTreeSet, }, time::Duration, }; use common::{ bootstrap_model::index::{ database_index::IndexedFields, IndexMetadata, }, runtime::Runtime, testing::TestPersistence, types::{ IndexDescriptor, IndexName, }, }; use database::IndexModel; use futures::{ pin_mut, select_biased, FutureExt, }; use keybroker::Identity; use maplit::btreemap; use model::airbyte_import::AIRBYTE_PRIMARY_KEY_INDEX_DESCRIPTOR; use runtime::testing::TestRuntime; use value::{ FieldPath, TableName, TableNamespace, }; use crate::{ airbyte_import::PrimaryKey, test_helpers::{ ApplicationFixtureArgs, ApplicationTestExt, }, Application, }; #[convex_macro::test_runtime] async fn test_system_indexes(rt: TestRuntime) -> anyhow::Result<()> { let persistence = TestPersistence::new(); let application = Application::new_for_tests_with_args( &rt, ApplicationFixtureArgs { tp: Some(persistence.clone()), ..Default::default() }, ) .await?; let (index_name, indexes) = new_system_indexes_on_user_table()?; let validated_index_metadata = IndexMetadata::new_backfilling( *application.now_ts_for_reads(), index_name.clone(), application .validate_user_defined_index_fields(indexes.get(&index_name).unwrap().clone())?, ); // Add index, check that it's there application ._add_system_indexes(&Identity::system(), indexes.clone()) .await?; wait_for_backfill(&rt, &application, TableNamespace::test_user(), &index_name).await?; let mut tx = application.begin(Identity::system()).await?; let found_index = IndexModel::new(&mut tx) .enabled_index_metadata(TableNamespace::test_user(), &index_name)? .unwrap(); assert!(found_index .config .same_spec(&validated_index_metadata.config)); // Add the same index, should be no-op application ._add_system_indexes(&Identity::system(), indexes) .await?; let found_index = IndexModel::new(&mut tx) .enabled_index_metadata(TableNamespace::test_user(), &index_name)? .unwrap(); assert!(found_index .config .same_spec(&validated_index_metadata.config)); // Add different index field, should update let field_path: FieldPath = "field".parse()?; let field_paths = vec![field_path]; let index_fields: IndexedFields = field_paths.try_into()?; let indexes = btreemap! {index_name.clone() => index_fields.clone()}; application ._add_system_indexes(&Identity::system(), indexes) .await?; wait_for_backfill(&rt, &application, TableNamespace::test_user(), &index_name).await?; let mut tx = application.begin(Identity::system()).await?; let found_index = IndexModel::new(&mut tx) .enabled_index_metadata(TableNamespace::test_user(), &index_name)? .unwrap(); let validated_index_metadata = IndexMetadata::new_backfilling( *application.now_ts_for_reads(), index_name.clone(), application.validate_user_defined_index_fields(index_fields.clone())?, ); assert!(found_index .config .same_spec(&validated_index_metadata.config)); // Add many indexes let new_index_name: IndexName = IndexName::new_reserved( "table2".parse()?, AIRBYTE_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(), )?; let new_index_fields = vec!["id".parse()?].try_into()?; let indexes = btreemap! {index_name.clone() => index_fields, new_index_name.clone() => new_index_fields}; application ._add_system_indexes(&Identity::system(), indexes) .await?; wait_for_backfill(&rt, &application, TableNamespace::test_user(), &index_name).await?; wait_for_backfill( &rt, &application, TableNamespace::test_user(), &new_index_name, ) .await?; let mut tx = application.begin(Identity::system()).await?; let found_index = IndexModel::new(&mut tx) .enabled_index_metadata(TableNamespace::test_user(), &index_name)? .unwrap(); assert!(found_index .config .same_spec(&validated_index_metadata.config)); assert!(IndexModel::new(&mut tx) .enabled_index_metadata(TableNamespace::test_user(), &new_index_name)? .is_some()); Ok(()) } /// Observe a specific index via a subscription on all indexes returning /// successfully when that index moves into the enabled state. /// /// If the index is not enabled within a timeout, the method fails. /// /// This method will only work if we're writing a test where an index worker is /// already active ( e.g. the application). Some other worker must be running or /// the index state will never advance. Similarly, while this method looks /// generally, it'll currently only work for database indexes. Search indexes /// will fail because their index workers only advance them to the `Backfilled` /// state while this method will only succeed if the index moves to `Enabled`. /// We could make this modification if necessary in the future. async fn wait_for_backfill( rt: &TestRuntime, application: &Application<TestRuntime>, namespace: TableNamespace, index_name: &IndexName, ) -> anyhow::Result<()> { let timeout = rt.wait(Duration::from_secs(5)); pin_mut!(timeout); let mut timeout = timeout.fuse(); loop { let db = &application.database; let mut tx = db.begin_system().await?; let mut model = IndexModel::new(&mut tx); if model .enabled_index_metadata(namespace, index_name)? .is_some() { return Ok(()); } model.get_all_indexes().await?; let token = tx.into_token()?; let subscription = db.subscribe(token).await?; let subscription_fut = subscription.wait_for_invalidation(); pin_mut!(subscription_fut); select_biased! { _ = subscription_fut.fuse() => {}, _ = timeout => { anyhow::bail!("Timed out!"); } } } } fn new_system_indexes_on_user_table( ) -> anyhow::Result<(IndexName, BTreeMap<IndexName, IndexedFields>)> { let table_name: TableName = "table".parse()?; let index_name: IndexName = IndexName::new_reserved(table_name, AIRBYTE_PRIMARY_KEY_INDEX_DESCRIPTOR.clone())?; new_indexes(index_name) } fn new_indexes( index_name: IndexName, ) -> anyhow::Result<(IndexName, BTreeMap<IndexName, IndexedFields>)> { let field: FieldPath = "some_field".parse()?; let index_fields: IndexedFields = vec![field].try_into()?; Ok((index_name.clone(), btreemap! {index_name => index_fields})) } #[convex_macro::test_runtime] async fn new_system_indexes_are_enabled_automatically_by_index_worker( rt: TestRuntime, ) -> anyhow::Result<()> { let persistence = TestPersistence::new(); let application = Application::new_for_tests_with_args( &rt, ApplicationFixtureArgs { tp: Some(persistence.clone()), ..Default::default() }, ) .await?; let (index_name, indexes) = new_system_indexes_on_user_table()?; application ._add_system_indexes(&Identity::system(), indexes.clone()) .await?; wait_for_backfill(&rt, &application, TableNamespace::test_user(), &index_name).await?; let mut tx = application.begin(Identity::system()).await?; IndexModel::new(&mut tx) .enabled_index_metadata(TableNamespace::test_user(), &index_name)? .unwrap(); Ok(()) } #[convex_macro::test_runtime] async fn new_indexes_on_system_tables_are_enabled_automatically_by_index_worker( rt: TestRuntime, ) -> anyhow::Result<()> { let persistence = TestPersistence::new(); let application = Application::new_for_tests_with_args( &rt, ApplicationFixtureArgs { tp: Some(persistence.clone()), ..Default::default() }, ) .await?; let table_name: TableName = "_my_system_table".parse()?; let mut tx = application.begin(Identity::system()).await?; assert!( tx.create_system_table_testing(TableNamespace::test_user(), &table_name, None) .await? ); application.commit_test(tx).await?; let index_name = IndexName::new(table_name, IndexDescriptor::new("my_index")?)?; let (index_name, indexes) = new_indexes(index_name)?; application ._add_system_indexes(&Identity::system(), indexes.clone()) .await?; wait_for_backfill(&rt, &application, TableNamespace::test_user(), &index_name).await?; let mut tx = application.begin(Identity::system()).await?; IndexModel::new(&mut tx) .enabled_index_metadata(TableNamespace::test_user(), &index_name)? .unwrap(); Ok(()) } #[convex_macro::test_runtime] async fn test_indexes_ready(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; let table_1 = "table_1".parse::<TableName>()?; let table_2 = "table_2".parse::<TableName>()?; let primary_key = PrimaryKey::try_from(vec![vec!["id".to_string()]])?; let indexes = btreemap! {table_1.clone() => primary_key.clone(), table_2.clone() => primary_key}; application .add_primary_key_indexes(&Identity::system(), indexes) .await?; let indexes = BTreeSet::from([table_1, table_2]); application .wait_for_primary_key_indexes_ready(Identity::system(), indexes.clone()) .await?; let ready = application .primary_key_indexes_ready(Identity::system(), indexes) .await?; assert!(ready); Ok(()) } #[convex_macro::test_runtime] async fn test_indexes_ready_can_be_called_multiple_times(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; let table_1 = "table_1".parse::<TableName>()?; let table_2 = "table_2".parse::<TableName>()?; let primary_key = PrimaryKey::try_from(vec![vec!["id".to_string()]])?; let indexes = btreemap! {table_1.clone() => primary_key.clone(), table_2.clone() => primary_key}; application .add_primary_key_indexes(&Identity::system(), indexes) .await?; let indexes = BTreeSet::from([table_1.clone(), table_2.clone()]); application .wait_for_primary_key_indexes_ready(Identity::system(), indexes.clone()) .await?; let ready = application .primary_key_indexes_ready(Identity::system(), indexes.clone()) .await?; assert!(ready); let ready = application .primary_key_indexes_ready(Identity::system(), indexes.clone()) .await?; assert!(ready); Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server