Skip to main content
Glama

Convex MCP server

Official
by get-convex
index.rs45.3 kB
use std::collections::{ BTreeMap, BTreeSet, HashMap, }; use anyhow::Context; use common::{ bootstrap_model::index::{ database_index::{ DatabaseIndexSpec, DatabaseIndexState, IndexedFields, }, index_validation_error, text_index::{ TextIndexSpec, TextIndexState, }, vector_index::{ VectorIndexSpec, VectorIndexState, }, DeveloperIndexMetadata, IndexConfig, IndexMetadata, TabletIndexMetadata, INDEX_TABLE, }, document::ParsedDocument, interval::Interval, runtime::Runtime, schemas::{ DatabaseSchema, TableDefinition, MAX_INDEXES_PER_TABLE, }, types::{ IndexDescriptor, IndexDiff, IndexId, IndexName, StableIndexName, TableName, TabletIndexName, }, }; use errors::ErrorMetadata; use indexing::{ backend_in_memory_indexes::index_not_a_database_index_error, index_registry::{ index_not_found_error, Index, }, }; use value::{ ResolvedDocumentId, TableMapping, TableNamespace, TabletId, }; use crate::{ bootstrap_model::index_backfills::IndexBackfillModel, patch_value, query::TableFilter, reads::TransactionReadSet, system_tables::{ SystemIndex, SystemTable, }, table_summary::table_summary_bootstrapping_error, transaction_index::TransactionIndex, unauthorized_error, SystemMetadataModel, TableModel, Transaction, }; pub struct IndexTable; impl SystemTable for IndexTable { type Metadata = TabletIndexMetadata; fn table_name() -> &'static TableName { &INDEX_TABLE } fn indexes() -> Vec<SystemIndex<Self>> { vec![] } } pub struct IndexModel<'a, RT: Runtime> { tx: &'a mut Transaction<RT>, } impl<'a, RT: Runtime> IndexModel<'a, RT> { pub fn new(tx: &'a mut Transaction<RT>) -> Self { Self { tx } } /// Register given index. /// Indexes won't be backfilled and available for queries until after the /// transaction has committed. pub async fn add_application_index( &mut self, namespace: TableNamespace, index: IndexMetadata<TableName>, ) -> anyhow::Result<ResolvedDocumentId> { anyhow::ensure!( self.tx.identity().is_admin() || self.tx.identity().is_system(), unauthorized_error("add_index") ); anyhow::ensure!(!index.name.is_system_owned(), "Can't change system indexes"); let application_indexes = self.get_application_indexes(namespace).await?; // Indexes may exist in both a pending and an enabled state. If we're at or over // the index limit, we should still be able to add a new pending copy of // an enabled index with the expectation that the pending index will // replace the enabled index eventually. We must have other checks to // ensure we don't add multiple pending or enabled indexes, so // here we just verify we're not increasing the total number of indexes. let index_names_in_table = application_indexes .into_iter() .filter(|application_index| application_index.name.table() == index.name.table()) .map(|index| index.into_value().name) .collect::<BTreeSet<_>>(); anyhow::ensure!( index_names_in_table.contains(&index.name) || index_names_in_table.len() < MAX_INDEXES_PER_TABLE, index_validation_error::too_many_indexes(index.name.table(), MAX_INDEXES_PER_TABLE) ); self._add_index(namespace, index).await } /// Add system index. /// Indexes won't be backfilled and available for queries until /// after the transaction has committed. pub async fn add_system_index( &mut self, namespace: TableNamespace, index: IndexMetadata<TableName>, ) -> anyhow::Result<()> { anyhow::ensure!( self.tx.identity().is_admin() || self.tx.identity().is_system(), unauthorized_error("add_system_index") ); self._add_index(namespace, index).await?; Ok(()) } async fn _add_index( &mut self, namespace: TableNamespace, index: IndexMetadata<TableName>, ) -> anyhow::Result<ResolvedDocumentId> { // Make sure the table exists before creating the index. TableModel::new(self.tx) .insert_table_metadata(namespace, index.name.table()) .await?; let index: TabletIndexMetadata = index.map_table( &self .tx .table_mapping() .namespace(namespace) .name_to_tablet(), )?; SystemMetadataModel::new_global(self.tx) .insert_metadata(&INDEX_TABLE, index.try_into()?) .await } #[cfg(any(test, feature = "testing"))] pub async fn enable_index_for_testing( &mut self, namespace: TableNamespace, index: &IndexName, ) -> anyhow::Result<()> { let metadata = self .pending_index_metadata(namespace, index)? .ok_or_else(|| anyhow::anyhow!("Failed to find pending index: {}", index))?; self.enable_index(&metadata.into_value()).await?; Ok(()) } async fn enable_index(&mut self, backfilled_index: &TabletIndexMetadata) -> anyhow::Result<()> { anyhow::ensure!( self.tx.identity().is_admin() || self.tx.identity().is_system(), unauthorized_error("enable_index") ); anyhow::ensure!( !backfilled_index.name.is_by_id_or_creation_time(), "Can't change system indexes" ); let mut doc: ParsedDocument<TabletIndexMetadata> = self .pending_resolved_index_metadata(&backfilled_index.name) .ok_or_else(|| { anyhow::anyhow!( "Missing pending data for index: {:?}", backfilled_index.name ) })?; match doc.config { IndexConfig::Database { ref mut on_disk_state, .. } => match on_disk_state { DatabaseIndexState::Backfilling(_) | DatabaseIndexState::Enabled => { anyhow::bail!( "Expected backfilled index, but found: {:?} for {:?}", on_disk_state, backfilled_index.name.descriptor() ) }, DatabaseIndexState::Backfilled { .. } => { *on_disk_state = DatabaseIndexState::Enabled; }, }, IndexConfig::Text { ref mut on_disk_state, .. } => match on_disk_state { TextIndexState::Backfilled { snapshot, .. } => { *on_disk_state = TextIndexState::SnapshottedAt(snapshot.clone()); }, TextIndexState::Backfilling { .. } | TextIndexState::SnapshottedAt(_) => { anyhow::bail!( "Expected backfilled index, but found: {on_disk_state:?} for {:?}", backfilled_index.name.descriptor() ) }, }, IndexConfig::Vector { ref mut on_disk_state, .. } => match on_disk_state { VectorIndexState::Backfilled { snapshot, .. } => { *on_disk_state = VectorIndexState::SnapshottedAt(snapshot.clone()); }, VectorIndexState::Backfilling(_) | VectorIndexState::SnapshottedAt(_) => { anyhow::bail!( "Expected backfilled index, but found: {on_disk_state:?} for {:?}", backfilled_index.name.descriptor() ) }, }, }; let id = doc.id(); let table_id_metadata: TabletIndexMetadata = doc.into_value(); SystemMetadataModel::new_global(self.tx) .replace(id, table_id_metadata.try_into()?) .await?; Ok(()) } /// The old push flow split the index diff between indexes being added in /// the `prepare_schema` phase and indexes being deleted in the /// `apply_config` phase. /// /// This method merges the two halves to give a "full" index diff when the /// index changes have been prepared but not committed. #[fastrace::trace] pub async fn get_full_index_diff( &mut self, namespace: TableNamespace, next_schema: &Option<DatabaseSchema>, ) -> anyhow::Result<IndexDiff> { let empty = BTreeMap::new(); let tables_in_schema = next_schema .as_ref() .map(|schema| &schema.tables) .unwrap_or(&empty); let mut index_diff: IndexDiff = self.get_index_diff(namespace, tables_in_schema).await?; anyhow::ensure!(index_diff.added.is_empty(), "Expected no new indexes"); // Find all indexes that are being replaced by their pending variant to count as // added. index_diff.added = index_diff .identical .iter() .filter(|index| !index.config.is_enabled()) .map(|doc| doc.clone().into_value()) .collect(); index_diff .identical .retain(|index| index.config.is_enabled()); Ok(index_diff) } // This method assumes it's being called in apply_config, or at least after // indexes have been added and backfilled. #[fastrace::trace] pub async fn apply( &mut self, namespace: TableNamespace, next_schema: &Option<DatabaseSchema>, ) -> anyhow::Result<IndexDiff> { let empty = BTreeMap::new(); let tables_in_schema = next_schema .as_ref() .map(|schema| &schema.tables) .unwrap_or(&empty); // Without a schema id, we cannot accurately determine the status of // indexes. So for legacy CLIs, we do nothing here and instead rely // on build_indexes / legacy_get_indexes to commit index changes. let index_diff = self .commit_indexes_for_schema(namespace, tables_in_schema) .await?; tracing::info!( "Committed indexes for {namespace:?}: (added {}. dropped {})", index_diff.added.len(), index_diff.dropped.len(), ); Ok(index_diff) } pub async fn commit_indexes_for_schema( &mut self, namespace: TableNamespace, tables_in_schema: &BTreeMap<TableName, TableDefinition>, ) -> anyhow::Result<IndexDiff> { let index_diff: IndexDiff = self.get_index_diff(namespace, tables_in_schema).await?; let IndexDiff { added, identical, dropped, enabled, disabled, } = &index_diff; // New indexes should all have been added in prepare_schema, something has gone // wrong if we're trying to commit and realize there's a new index. anyhow::ensure!( added.is_empty(), "Trying to add new indexes when committing" ); for index in dropped { self.drop_index(index.id()).await?; } // Added indexes should have backfilled via build_indexes let mut table_model = TableModel::new(self.tx); let indexes_to_enable = identical .iter() .filter(|index| !index.config.is_enabled() && !index.config.is_staged()) .chain(enabled.iter()) .cloned() .map(|doc| table_model.doc_table_name_to_id(namespace, doc)) .collect::<anyhow::Result<Vec<_>>>()?; self.enable_backfilled_indexes(indexes_to_enable).await?; for index in disabled { self.disable_index(index.clone()).await?; } Ok(index_diff) } // Enables the given set of indexes if they're backfilled. pub async fn enable_backfilled_indexes( &mut self, indexes: Vec<ParsedDocument<TabletIndexMetadata>>, ) -> anyhow::Result<()> { for index in indexes { self.enable_index(&index).await?; IndexBackfillModel::new(self.tx) .delete_index_backfill(index.id()) .await?; } Ok(()) } async fn disable_index( &mut self, doc: ParsedDocument<DeveloperIndexMetadata>, ) -> anyhow::Result<()> { tracing::info!("Disabling index: {doc:?}"); let id = doc.id(); let new_config = match doc.into_value().config { IndexConfig::Database { spec, on_disk_state, } => match on_disk_state { DatabaseIndexState::Enabled => IndexConfig::Database { spec, on_disk_state: DatabaseIndexState::Backfilled { staged: true }, }, _ => { anyhow::bail!("Index is not enabled, so it cannot be disabled"); }, }, IndexConfig::Text { spec, on_disk_state, } => match on_disk_state { TextIndexState::SnapshottedAt(snapshot) => IndexConfig::Text { spec, on_disk_state: TextIndexState::Backfilled { snapshot, staged: true, }, }, _ => { anyhow::bail!("Index is not enabled, so it cannot be disabled"); }, }, IndexConfig::Vector { spec, on_disk_state, } => match on_disk_state { VectorIndexState::SnapshottedAt(snapshot) => IndexConfig::Vector { spec, on_disk_state: VectorIndexState::Backfilled { snapshot, staged: true, }, }, _ => { anyhow::bail!("Index is not enabled, so it cannot be disabled"); }, }, }; anyhow::ensure!(new_config.is_backfilled(), "Index is not backfilled"); SystemMetadataModel::new_global(self.tx) .patch(id, patch_value!("config" => Some(new_config.try_into()?))?) .await?; Ok(()) } /// Collect all the indexes in the new schema. /// /// An earlier step (JSON conversion) is responsible for ensuring that index /// names are unique. async fn indexes_in_new_schema( &mut self, tables_in_schema: &BTreeMap<TableName, TableDefinition>, ) -> anyhow::Result<BTreeMap<IndexName, IndexMetadata<TableName>>> { let mut indexes_in_schema: BTreeMap<IndexName, IndexMetadata<TableName>> = BTreeMap::new(); for (table_name, table_schema) in tables_in_schema { for (index_descriptor, index_schema) in &table_schema.indexes { let index_name = IndexName::new(table_name.clone(), index_descriptor.clone())?; let exists = indexes_in_schema.insert( index_name.clone(), IndexMetadata::new_backfilling( *self.tx.begin_timestamp(), index_name.clone(), index_schema.fields.clone(), ), ); anyhow::ensure!(exists.is_none(), "Index appears twice: {index_name}"); } for (index_descriptor, index_schema) in &table_schema.staged_db_indexes { let index_name = IndexName::new(table_name.clone(), index_descriptor.clone())?; let exists = indexes_in_schema.insert( index_name.clone(), IndexMetadata::new_staged_backfilling( *self.tx.begin_timestamp(), index_name.clone(), index_schema.fields.clone(), ), ); anyhow::ensure!(exists.is_none(), "Index appears twice: {index_name}"); } for (index_descriptor, index_schema) in &table_schema.text_indexes { let index_name = IndexName::new(table_name.clone(), index_descriptor.clone())?; let exists = indexes_in_schema.insert( index_name.clone(), IndexMetadata::new_backfilling_text_index( index_name.clone(), index_schema.search_field.clone(), index_schema.filter_fields.clone(), ), ); anyhow::ensure!(exists.is_none(), "Index appears twice: {index_name}"); } for (index_descriptor, index_schema) in &table_schema.staged_text_indexes { let index_name = IndexName::new(table_name.clone(), index_descriptor.clone())?; let exists = indexes_in_schema.insert( index_name.clone(), IndexMetadata::new_staged_backfilling_text_index( index_name.clone(), index_schema.search_field.clone(), index_schema.filter_fields.clone(), ), ); anyhow::ensure!(exists.is_none(), "Index appears twice: {index_name}"); } for (index_descriptor, index_schema) in &table_schema.vector_indexes { let index_name = IndexName::new(table_name.clone(), index_descriptor.clone())?; let exists = indexes_in_schema.insert( index_name.clone(), IndexMetadata::new_backfilling_vector_index( index_name.clone(), index_schema.vector_field.clone(), index_schema.dimension, index_schema.filter_fields.clone(), ), ); anyhow::ensure!(exists.is_none(), "Index appears twice: {index_name}"); } for (index_descriptor, index_schema) in &table_schema.staged_vector_indexes { let index_name = IndexName::new(table_name.clone(), index_descriptor.clone())?; let exists = indexes_in_schema.insert( index_name.clone(), IndexMetadata::new_staged_backfilling_vector_index( index_name.clone(), index_schema.vector_field.clone(), index_schema.dimension, index_schema.filter_fields.clone(), ), ); anyhow::ensure!(exists.is_none(), "Index appears twice: {index_name}"); } } Ok(indexes_in_schema) } /// Given a set of tables from a not yet fully committed schema, /// returns the difference between the indexes in those not yet committed /// tables and the indexes in storage. We compare only the developer config /// and pending/enabled state of the indexes to determine the diff. pub async fn get_index_diff( &mut self, namespace: TableNamespace, tables_in_new_schema: &BTreeMap<TableName, TableDefinition>, ) -> anyhow::Result<IndexDiff> { let indexes_in_schema = self.indexes_in_new_schema(tables_in_new_schema).await?; let indexes_no_longer_in_schema: HashMap< IndexName, Vec<ParsedDocument<DeveloperIndexMetadata>>, > = self .get_application_indexes(namespace) .await? .into_iter() .fold(HashMap::new(), |mut acc, index| { if !indexes_in_schema.contains_key(&index.name) { acc.entry(index.name.clone()).or_default().push(index); } acc }); let mut diff = IndexDiff::default(); for (_, new_index) in indexes_in_schema { match self.compare_new_and_existing_indexes(namespace, new_index)? { IndexComparison::Added(index) => diff.added.push(index), IndexComparison::Identical(index) => diff.identical.push(index), IndexComparison::Disabled(index) => diff.disabled.push(index), IndexComparison::Enabled(index) => diff.enabled.push(index), IndexComparison::Replaced { replaced, replacement, } => { for index in replaced { diff.dropped.push(index); } match replacement { ReplacementIndex::NewOrUpdated(index) => diff.added.push(index), ReplacementIndex::Identical(index) => diff.identical.push(index), } }, } } for (name, mut indexes) in indexes_no_longer_in_schema { anyhow::ensure!( !name.is_system_owned(), "Preparing to drop a system index: {:?}", name, ); diff.dropped.append(&mut indexes); } Ok(diff) } fn identical_or_replaced( &mut self, mut existing_index: ParsedDocument<DeveloperIndexMetadata>, new_index: DeveloperIndexMetadata, ) -> IndexComparison { if existing_index.config.same_spec(&new_index.config) { if existing_index.config.is_staged() == new_index.config.is_staged() { IndexComparison::Identical(existing_index) } else { // Staged status changed - use the previous on-disk state // Toggle the staged status and return enabled/disabled to // inform callsite. if existing_index.config.is_staged() { existing_index.config.set_staged(false); IndexComparison::Enabled(existing_index) } else { existing_index.config.set_staged(true); IndexComparison::Disabled(existing_index) } } } else { IndexComparison::Replaced { replaced: vec![existing_index], replacement: ReplacementIndex::NewOrUpdated(new_index), } } } fn compare_new_and_existing_indexes( &mut self, namespace: TableNamespace, new_index: DeveloperIndexMetadata, ) -> anyhow::Result<IndexComparison> { let pending_index = self.pending_index_metadata(namespace, &new_index.name)?; let enabled_index = self.enabled_index_metadata(namespace, &new_index.name)?; let mut table_model = TableModel::new(self.tx); let pending_index = pending_index .map(|doc| table_model.doc_table_id_to_name(doc)) .transpose()?; let enabled_index = enabled_index .map(|doc| table_model.doc_table_id_to_name(doc)) .transpose()?; Ok(match (enabled_index, pending_index) { (None, None) => IndexComparison::Added(new_index), (Some(enabled_index), None) => self.identical_or_replaced(enabled_index, new_index), (None, Some(pending_index)) => self.identical_or_replaced(pending_index, new_index), (Some(enabled_index), Some(pending_index)) => { let mut comparison = self.identical_or_replaced(pending_index.clone(), new_index); if let IndexComparison::Replaced { replaced, replacement: _, } = &mut comparison { // If the pending index has been mutated, we need to replace both the // pending and enabled indexes. anyhow::ensure!(*replaced == vec![pending_index.clone()]); *replaced = vec![enabled_index, pending_index]; } else if let IndexComparison::Identical(index) = &mut comparison { // If the pending index is identical to the new index, we need to replace // the enabled index with the pending index. anyhow::ensure!(index == &pending_index); comparison = IndexComparison::Replaced { replaced: vec![enabled_index], replacement: ReplacementIndex::Identical(pending_index), }; } comparison }, }) } /// Inserts new and updated mutated indexes so they can be backfilled. /// Returns the complete index diff, even though only the additions are /// immediately applied (the rest will be applied in apply_config) #[fastrace::trace] pub async fn prepare_new_and_mutated_indexes( &mut self, namespace: TableNamespace, schema: &DatabaseSchema, ) -> anyhow::Result<IndexDiff> { let diff: IndexDiff = self.get_index_diff(namespace, &schema.tables).await?; // If an index is currently pending and we're mutating it, we need to drop the // currently pending index immediately so that we avoid having multiple pending // indexes with the same name. This is still atomic(ish) because users cannot be // using the pending indexes we drop here. let dropped: Vec<ParsedDocument<IndexMetadata<TableName>>> = diff .dropped .iter() .filter(|index| !index.config.is_enabled()) .cloned() .collect(); let added = diff.added.clone(); tracing::info!( "Preparing new and mutated indexes. Adding {}. Dropping {}.", added.len(), dropped.len(), ); for index in &dropped { self.drop_index(index.id()).await?; } for index in &diff.added { self.add_application_index(namespace, index.clone()).await?; } // Replace all the enabled/disabled ones in-place to update their `staged` // status. The actual enabling/disabling will be done at finish_push // (apply_config). for index in diff.enabled.iter().chain(diff.disabled.iter()) { let (id, value) = index.clone().into_id_and_value(); let new_config = value.config.try_into()?; SystemMetadataModel::new_global(self.tx) .patch(id, patch_value!("config" => Some(new_config))?) .await?; } Ok(diff) } pub fn indexed_fields( &mut self, stable_index_name: &StableIndexName, printable_index_name: &IndexName, ) -> anyhow::Result<IndexedFields> { let resolved_index_name = stable_index_name .tablet_index_name() .with_context(|| index_not_found_error(printable_index_name))?; let metadata = self.require_enabled_index_metadata(printable_index_name, resolved_index_name)?; match metadata.config.clone() { IndexConfig::Database { spec: DatabaseIndexSpec { fields }, .. } => Ok(fields), _ => anyhow::bail!(index_not_a_database_index_error(printable_index_name)), } } /// Returns the index metadata for the given name if it's enabled or fails /// with a descriptive error if the index is either missing or not /// enabled. /// /// Queries and most other use cases should use this method or /// enable_index_metadata isntead of pending_index_metadata. fn require_enabled_index_metadata( &mut self, printable_index_name: &IndexName, resolved_index_name: &TabletIndexName, ) -> anyhow::Result<ParsedDocument<TabletIndexMetadata>> { // Because require_enabled does not mutate table_mapping, we can clone it here // to avoid duplicate mutable references to self.tx. If require_enabled // ever does start mutating the mapping, then this would be unsafe. Ok(self .tx .index .require_enabled( &mut self.tx.reads, resolved_index_name, printable_index_name, )? .metadata() .clone()) } /// Returns the index metadata for the given name if it's in the enabled /// state or None if the index either can't be found or is not enabled. /// /// Queries and most other use cases should use /// require_enabled_index_metadata or this method instead of /// pending_index_metadata. pub fn enabled_index_metadata( &mut self, namespace: TableNamespace, index_name: &IndexName, ) -> anyhow::Result<Option<ParsedDocument<TabletIndexMetadata>>> { self._index_metadata(namespace, index_name, |indexes, reads, index_name| { indexes.get_enabled(reads, &index_name) }) } /// Returns the index metadata for the given name if it's not yet enabled or /// None if the index either can't be found or is enabled. /// /// Only use this method when you're mutating indexes or their state. /// Queries and most other use cases should use enabled_index_metadata /// or require_enabled_index_metadata instead. pub fn pending_index_metadata( &mut self, namespace: TableNamespace, index_name: &IndexName, ) -> anyhow::Result<Option<ParsedDocument<TabletIndexMetadata>>> { self._index_metadata(namespace, index_name, |indexes, reads, index_name| { indexes.get_pending(reads, &index_name) }) } fn pending_resolved_index_metadata( &mut self, index_name: &TabletIndexName, ) -> Option<ParsedDocument<TabletIndexMetadata>> { let index = self.tx.index.get_pending(&mut self.tx.reads, index_name)?; Some(index.metadata.clone()) } fn _index_metadata<'b>( &'b mut self, namespace: TableNamespace, index_name: &IndexName, getter: impl FnOnce( &'b mut TransactionIndex, &'b mut TransactionReadSet, TabletIndexName, ) -> Option<&'b Index>, ) -> anyhow::Result<Option<ParsedDocument<TabletIndexMetadata>>> { if !self .tx .table_mapping() .namespace(namespace) .name_exists(index_name.table()) { return Ok(None); } let index_name = self.resolve_index_name(namespace, index_name)?; Ok(getter(&mut self.tx.index, &mut self.tx.reads, index_name) .map(|index| index.metadata.clone())) } pub fn stable_index_name( &mut self, namespace: TableNamespace, index_name: &IndexName, table_filter: TableFilter, ) -> anyhow::Result<StableIndexName> { if self .tx .virtual_system_mapping() .is_virtual_table(index_name.table()) { let physical_index_name = self .tx .virtual_system_mapping() .virtual_to_system_index(index_name)? .clone(); Ok(StableIndexName::Virtual( index_name.clone(), self.resolve_index_name(namespace, &physical_index_name)?, )) } else if self .tx .table_mapping() .namespace(namespace) .name_exists(index_name.table()) { match table_filter { TableFilter::IncludePrivateSystemTables => Ok(StableIndexName::Physical( self.resolve_index_name(namespace, index_name)?, )), TableFilter::ExcludePrivateSystemTables => { if index_name.table().is_system() { Ok(StableIndexName::Missing(index_name.clone())) } else { Ok(StableIndexName::Physical( self.resolve_index_name(namespace, index_name)?, )) } }, } } else { Ok(StableIndexName::Missing(index_name.clone())) } } fn resolve_index_name( &mut self, namespace: TableNamespace, index_name: &IndexName, ) -> anyhow::Result<TabletIndexName> { index_name.clone().map_table( &self .tx .table_mapping() .namespace(namespace) .name_to_tablet(), ) } /// Returns by_id indexes for *all tablets*, including hidden ones. pub async fn by_id_indexes(&mut self) -> anyhow::Result<BTreeMap<TabletId, IndexId>> { let all_indexes = self.get_all_indexes().await?; Ok(all_indexes .into_iter() .filter(|index| index.name.is_by_id()) .map(|index| (*index.name.table(), index.id().internal_id())) .collect()) } pub async fn by_id_index_metadata( &mut self, tablet_id: TabletId, ) -> anyhow::Result<ParsedDocument<TabletIndexMetadata>> { self.all_indexes_on_table(tablet_id) .await? .into_iter() .find(|index| index.name.is_by_id()) .ok_or_else(|| anyhow::anyhow!("by_id index missing for {tablet_id}")) } /// All indexes (system and developer-defined and /// backfilling/backfilled/enabled) for a single table. pub async fn all_indexes_on_table( &mut self, tablet_id: TabletId, ) -> anyhow::Result<Vec<ParsedDocument<TabletIndexMetadata>>> { let all_indexes = self.get_all_indexes().await?; Ok(all_indexes .into_iter() .filter(|index| *index.name.table() == tablet_id) .collect()) } /// Returns all registered indexes (both system and developer-defined) /// including both pending and enabled indexes. /// /// Because of mutated indexes, there may be up to two indexes with the same /// name (but different configurations). pub async fn get_all_indexes( &mut self, ) -> anyhow::Result<Vec<ParsedDocument<TabletIndexMetadata>>> { let index_by_id = TabletIndexName::by_id( self.tx .table_mapping() .namespace(TableNamespace::Global) .id(&INDEX_TABLE) .context("_index should exist")? .tablet_id, ); self.tx.reads.record_indexed_directly( index_by_id, IndexedFields::by_id(), Interval::all(), )?; Ok(self .tx .index .index_registry() .all_indexes() .cloned() .collect()) } /// Returns all search indexes (text and vector) on non-empty tables. pub async fn get_all_non_empty_search_indexes( &mut self, ) -> anyhow::Result<Vec<ParsedDocument<TabletIndexMetadata>>> { let all_indexes = self.get_all_indexes().await?; let mut non_empty_indexes = vec![]; for index in all_indexes { match index.config { IndexConfig::Text { .. } | IndexConfig::Vector { .. } => (), IndexConfig::Database { .. } => continue, }; let table = *index.name.table(); let Some(count) = self.tx.count_snapshot.count(table).await? else { return Err(table_summary_bootstrapping_error(Some( "Table count unavailable while bootstrapping", ))); }; if count != 0 { non_empty_indexes.push(index); } } Ok(non_empty_indexes) } /// Returns the index metadata matching the given id or fails if the /// document is missing or not an index. pub async fn require_index_by_id( &mut self, index_id: ResolvedDocumentId, ) -> anyhow::Result<ParsedDocument<TabletIndexMetadata>> { let document = self .tx .get(index_id) .await? .with_context(|| format!("Missing document for index id {index_id:?}"))?; TabletIndexMetadata::from_document(document) } /// Returns all registered indexes that are system owned including both /// pending and enabled indexes. /// /// Because of mutated indexes, there may be up to two indexes with the same /// name (but different configurations). pub async fn get_system_indexes( &mut self, namespace: TableNamespace, ) -> anyhow::Result<Vec<ParsedDocument<DeveloperIndexMetadata>>> { self.get_indexes(IndexCategory::System, namespace).await } /// Returns all registered indexes that aren't system owned including both /// pending and enabled indexes. /// /// Because of mutated indexes, there may be up to two indexes with the same /// name (but different configurations). pub async fn get_application_indexes( &mut self, namespace: TableNamespace, ) -> anyhow::Result<Vec<ParsedDocument<DeveloperIndexMetadata>>> { self.get_indexes(IndexCategory::Application, namespace) .await } async fn get_indexes( &mut self, category: IndexCategory, namespace: TableNamespace, ) -> anyhow::Result<Vec<ParsedDocument<DeveloperIndexMetadata>>> { let indexes = self.get_all_indexes().await?; let table_mapping = self.tx.table_mapping(); let mut result = vec![]; for doc in indexes { if !category.belongs(&doc, table_mapping) { continue; } let tablet_id = *doc.name.table(); if table_mapping.tablet_namespace(tablet_id)? != namespace { continue; } let doc = doc.map(|metadata| metadata.map_table(&table_mapping.tablet_to_name()))?; result.push(doc); } Ok(result) } pub async fn drop_index(&mut self, index_id: ResolvedDocumentId) -> anyhow::Result<()> { SystemMetadataModel::new_global(self.tx) .delete(index_id) .await?; Ok(()) } pub async fn drop_system_index( &mut self, namespace: TableNamespace, index_name: IndexName, ) -> anyhow::Result<()> { anyhow::ensure!(index_name.table().is_system()); let system_index = self .get_system_indexes(namespace) .await? .into_iter() .find(|index| index.name == index_name); if let Some(system_index) = system_index { tracing::info!("Dropping system index {index_name}"); self.drop_index(system_index.id()).await?; } Ok(()) } pub async fn copy_indexes_to_table( &mut self, namespace: TableNamespace, source_table: &TableName, target_table: TabletId, ) -> anyhow::Result<()> { // Copy over enabled indexes from existing active table, if any. let Some(active_table_id) = self .tx .table_mapping() .namespace(namespace) .id_if_exists(source_table) else { return Ok(()); }; for index in IndexModel::new(self.tx) .all_indexes_on_table(active_table_id) .await? { anyhow::ensure!( !index.config.is_backfilling(), ErrorMetadata::bad_request( "InvalidImport", format!( "{source_table} is still backfilling indexes, so it cannot be replaced. \ Wait for indexes to complete backfilling" ) ) ); if !index.config.is_enabled() { // Only copy Enabled indexes, otherwise we might get naming conflicts. continue; } if index.name.is_by_id_or_creation_time() { // by_id and by_creation_time already created. continue; } let index_name = if index.name.descriptor().is_reserved() { TabletIndexName::new_reserved(target_table, index.name.descriptor().clone())? } else { TabletIndexName::new(target_table, index.name.descriptor().clone())? }; let metadata = match index.into_value().config { IndexConfig::Database { spec: DatabaseIndexSpec { fields }, .. } => IndexMetadata::new_backfilling(*self.tx.begin_timestamp(), index_name, fields), IndexConfig::Text { spec: TextIndexSpec { search_field, filter_fields, }, .. } => IndexMetadata::new_backfilling_text_index( index_name, search_field, filter_fields, ), IndexConfig::Vector { spec: VectorIndexSpec { dimensions, vector_field, filter_fields, }, .. } => IndexMetadata::new_backfilling_vector_index( index_name, vector_field, dimensions, filter_fields, ), }; SystemMetadataModel::new_global(self.tx) .insert_metadata(&INDEX_TABLE, metadata.try_into()?) .await?; } Ok(()) } // Check if the system index is ready for all the given tables. // Useful for streaming import - waiting for the system indexes to be ready // for all the tables before proceeding with the import. pub async fn indexes_ready( &mut self, index_descriptor: &IndexDescriptor, indexes: &BTreeSet<TableName>, ) -> anyhow::Result<bool> { let index_metadata = indexes .iter() .map(|table_name| { let index_name = IndexName::new_reserved(table_name.clone(), index_descriptor.clone())?; // We really just want pending indexes here, but since it's convenient, we're // also verifying that all requested tables have the expected // index using enabled_index_metadata. let mut model = IndexModel::new(self.tx); let index_metadata = model .pending_index_metadata(TableNamespace::root_component(), &index_name)? .or(model .enabled_index_metadata(TableNamespace::root_component(), &index_name)?) .context(ErrorMetadata::bad_request( "MissingIndex", format!("Missing index: {index_name}"), ))?; Ok(index_metadata) }) .collect::<anyhow::Result<Vec<_>>>()?; let are_all_indexes_ready = index_metadata .iter() .all(|metadata| !metadata.config.is_backfilling()); Ok(are_all_indexes_ready) } } enum IndexCategory { System, Application, } impl IndexCategory { fn belongs( &self, index: &ParsedDocument<TabletIndexMetadata>, table_mapping: &TableMapping, ) -> bool { let is_system = index.name.descriptor().is_reserved() || table_mapping.is_system_tablet(*index.name.table()); let is_active = table_mapping.is_active(*index.name.table()); is_active && match self { Self::System => is_system, Self::Application => !is_system, } } } enum IndexComparison { Added(DeveloperIndexMetadata), Identical(ParsedDocument<DeveloperIndexMetadata>), Replaced { replaced: Vec<ParsedDocument<DeveloperIndexMetadata>>, replacement: ReplacementIndex, }, /// Enable a staged index Enabled(ParsedDocument<DeveloperIndexMetadata>), /// Disable a staged index Disabled(ParsedDocument<DeveloperIndexMetadata>), } enum ReplacementIndex { /// This replacement index is not yet in storage or its definition has /// changed. NewOrUpdated(DeveloperIndexMetadata), /// The replacement index is in storage and its definition has not changed. Identical(ParsedDocument<DeveloperIndexMetadata>), }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server