Skip to main content
Glama

Convex MCP server

Official
by get-convex
transaction.rs44.7 kB
#[cfg(any(test, feature = "testing"))] use std::fmt::Debug; use std::{ collections::{ BTreeMap, BTreeSet, }, mem, ops::Deref, sync::Arc, }; use anyhow::Context; use async_trait::async_trait; use common::{ bootstrap_model::{ index::{ database_index::IndexedFields, IndexMetadata, INDEX_TABLE, }, schema::SchemaState, tables::{ TableMetadata, TABLES_TABLE, }, }, components::{ ComponentId, ComponentPath, }, document::{ CreationTime, DocumentUpdateWithPrevTs, ResolvedDocument, }, identity::InertIdentity, index::{ IndexKey, IndexKeyBytes, }, interval::Interval, knobs::{ TEXT_INDEX_SIZE_HARD_LIMIT, VECTOR_INDEX_SIZE_HARD_LIMIT, }, persistence::RetentionValidator, query::{ CursorPosition, Order, Search, SearchVersion, }, runtime::Runtime, schemas::DatabaseSchema, sync::split_rw_lock::Reader, types::{ GenericIndexName, IndexId, IndexName, PersistenceVersion, RepeatableTimestamp, StableIndexName, TableName, TableStats, TabletIndexName, WriteTimestamp, }, value::{ id_v6::DeveloperDocumentId, ConvexObject, ResolvedDocumentId, Size, TableMapping, }, version::Version, virtual_system_mapping::VirtualSystemMapping, }; use errors::ErrorMetadata; use imbl::OrdMap; use indexing::backend_in_memory_indexes::RangeRequest; use keybroker::{ Identity, UserIdentityAttributes, }; use search::{ metrics::SearchType, CandidateRevision, }; use sync_types::{ AuthenticationToken, Timestamp, }; use tokio::task; use usage_tracking::FunctionUsageTracker; use value::{ TableNamespace, TableNumber, TabletId, }; use crate::{ bootstrap_model::{ defaults::BootstrapTableIds, table::{ NUM_RESERVED_LEGACY_TABLE_NUMBERS, NUM_RESERVED_SYSTEM_TABLE_NUMBERS, }, }, committer::table_dependency_sort_key, execution_size::FunctionExecutionSize, metrics::{ self, log_index_too_large_blocking_writes, }, patch::PatchValue, preloaded::PreloadedIndexRange, query::{ IndexRangeResponse, TableFilter, }, reads::TransactionReadSet, schema_registry::SchemaRegistry, snapshot_manager::{ Snapshot, SnapshotManager, }, table_summary::table_summary_bootstrapping_error, token::Token, transaction_id_generator::TransactionIdGenerator, transaction_index::TransactionIndex, write_limits::BiggestDocumentWrites, writes::{ NestedWriteToken, NestedWrites, TransactionWriteSize, Writes, }, ComponentRegistry, IndexModel, ReadSet, SchemaModel, SystemMetadataModel, TableModel, TableRegistry, SCHEMAS_TABLE, }; /// Safe default number of items to return for each list or filter operation /// when we're writing internal code and don't know what other value to choose. pub const DEFAULT_PAGE_SIZE: usize = 512; pub const MAX_PAGE_SIZE: usize = 1024; pub struct Transaction<RT: Runtime> { pub(crate) identity: Identity, pub(crate) id_generator: TransactionIdGenerator, pub(crate) next_creation_time: CreationTime, // Size of any functions scheduled from this transaction. pub scheduled_size: TransactionWriteSize, pub(crate) reads: TransactionReadSet, pub(crate) writes: NestedWrites<Writes>, pub(crate) index: NestedWrites<TransactionIndex>, pub(crate) metadata: NestedWrites<TableRegistry>, pub(crate) schema_registry: NestedWrites<SchemaRegistry>, pub(crate) component_registry: NestedWrites<ComponentRegistry>, pub(crate) count_snapshot: Arc<dyn TableCountSnapshot>, /// The change in the number of documents in table that have had writes in /// this transaction. If there is no entry for a table, assume deltas /// are zero. pub(crate) table_count_deltas: BTreeMap<TabletId, i64>, pub(crate) stats: BTreeMap<TabletId, TableStats>, pub(crate) retention_validator: Arc<dyn RetentionValidator>, pub(crate) runtime: RT, pub usage_tracker: FunctionUsageTracker, pub(crate) virtual_system_mapping: VirtualSystemMapping, #[cfg(any(test, feature = "testing"))] index_size_override: Option<usize>, } #[cfg(any(test, feature = "testing"))] impl<RT: Runtime> Debug for Transaction<RT> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Transaction").finish() } } #[async_trait] pub trait TableCountSnapshot: Send + Sync + 'static { /// Returns the number of documents in the table at the timestamp of the /// snapshot. async fn count(&self, table: TabletId) -> anyhow::Result<Option<u64>>; } pub struct SubtransactionToken { writes: NestedWriteToken, index: NestedWriteToken, tables: NestedWriteToken, schema_registry: NestedWriteToken, component_registry: NestedWriteToken, } impl<RT: Runtime> Transaction<RT> { pub fn new( identity: Identity, id_generator: TransactionIdGenerator, creation_time: CreationTime, index: TransactionIndex, metadata: TableRegistry, schema_registry: SchemaRegistry, component_registry: ComponentRegistry, count: Arc<dyn TableCountSnapshot>, runtime: RT, usage_tracker: FunctionUsageTracker, retention_validator: Arc<dyn RetentionValidator>, virtual_system_mapping: VirtualSystemMapping, ) -> Self { Self { identity, reads: TransactionReadSet::new(), writes: NestedWrites::new(Writes::new()), id_generator, next_creation_time: creation_time, scheduled_size: TransactionWriteSize::default(), index: NestedWrites::new(index), metadata: NestedWrites::new(metadata), schema_registry: NestedWrites::new(schema_registry), component_registry: NestedWrites::new(component_registry), count_snapshot: count, table_count_deltas: BTreeMap::new(), stats: BTreeMap::new(), runtime, retention_validator, usage_tracker, virtual_system_mapping, #[cfg(any(test, feature = "testing"))] index_size_override: None, } } pub fn persistence_version(&self) -> PersistenceVersion { self.index.index_registry().persistence_version() } pub fn table_mapping(&mut self) -> &TableMapping { self.take_table_mapping_dep(); self.metadata.table_mapping() } pub fn resolve_developer_id( &mut self, id: &DeveloperDocumentId, namespace: TableNamespace, ) -> anyhow::Result<ResolvedDocumentId> { id.to_resolved(self.table_mapping().namespace(namespace).number_to_tablet()) } pub fn virtual_system_mapping(&self) -> &VirtualSystemMapping { &self.virtual_system_mapping } /// Checks both virtual tables and tables to get the table number to name /// mapping. If table is excluded by `table_filter`, returns error as if /// the table doesn't exist. pub fn all_tables_number_to_name( &mut self, namespace: TableNamespace, table_filter: TableFilter, ) -> impl Fn(TableNumber) -> anyhow::Result<TableName> + '_ { let table_mapping = self.table_mapping().namespace(namespace); let virtual_system_mapping = self.virtual_system_mapping().clone(); move |number| { let name = table_mapping.number_to_name()(number)?; if let Some(virtual_name) = virtual_system_mapping.system_to_virtual_table(&name) { Ok(virtual_name.clone()) } else { match table_filter { TableFilter::IncludePrivateSystemTables => {}, TableFilter::ExcludePrivateSystemTables => { anyhow::ensure!(!name.is_system(), "cannot find table {number:?}"); }, } Ok(name) } } } pub fn bootstrap_tables(&mut self) -> BootstrapTableIds { BootstrapTableIds::new(self.table_mapping()) } pub fn resolve_idv6( &mut self, id: DeveloperDocumentId, namespace: TableNamespace, table_filter: TableFilter, ) -> anyhow::Result<TableName> { match self.all_tables_number_to_name(namespace, table_filter)(id.table()) { Ok(table_name) => Ok(table_name), Err(_) => anyhow::bail!("Table for ID \"{}\" not found", id.encode()), } } pub fn runtime(&self) -> &RT { &self.runtime } pub fn identity(&self) -> &Identity { &self.identity } pub fn inert_identity(&self) -> InertIdentity { self.identity.clone().into() } pub fn user_identity(&self) -> Option<UserIdentityAttributes> { match self.identity.clone() { Identity::User(identity) => Some(identity.attributes), Identity::ActingUser(_, identity) => Some(identity), _ => None, } } pub fn authentication_token(&self) -> AuthenticationToken { self.identity.clone().into() } pub fn begin_timestamp(&self) -> RepeatableTimestamp { self.index.base_snapshot().timestamp() } pub fn is_readonly(&self) -> bool { self.writes.is_empty() } pub fn begin_subtransaction(&mut self) -> SubtransactionToken { SubtransactionToken { writes: self.writes.begin_nested(), index: self.index.begin_nested(), tables: self.metadata.begin_nested(), schema_registry: self.schema_registry.begin_nested(), component_registry: self.component_registry.begin_nested(), } } pub fn commit_subtransaction(&mut self, tokens: SubtransactionToken) -> anyhow::Result<()> { self.writes.commit_nested(tokens.writes)?; self.index.commit_nested(tokens.index)?; self.metadata.commit_nested(tokens.tables)?; self.schema_registry.commit_nested(tokens.schema_registry)?; self.component_registry .commit_nested(tokens.component_registry)?; Ok(()) } pub fn rollback_subtransaction(&mut self, tokens: SubtransactionToken) -> anyhow::Result<()> { self.writes.rollback_nested(tokens.writes)?; self.index.rollback_nested(tokens.index)?; self.metadata.rollback_nested(tokens.tables)?; self.schema_registry .rollback_nested(tokens.schema_registry)?; self.component_registry .rollback_nested(tokens.component_registry)?; Ok(()) } pub fn require_not_nested(&self) -> anyhow::Result<()> { self.writes.require_not_nested()?; self.index.require_not_nested()?; self.metadata.require_not_nested()?; self.schema_registry.require_not_nested()?; self.component_registry.require_not_nested()?; Ok(()) } pub fn writes(&self) -> &NestedWrites<Writes> { &self.writes } pub fn into_reads_and_writes(self) -> (TransactionReadSet, NestedWrites<Writes>) { (self.reads, self.writes) } pub fn biggest_document_writes(&self) -> Option<BiggestDocumentWrites> { let mut max_size = 0; let mut biggest_document_id = None; let mut max_nesting = 0; let mut most_nested_document_id = None; for (document_id, DocumentUpdateWithPrevTs { new_document, .. }) in self.writes.coalesced_writes() { let (size, nesting) = new_document .as_ref() .map(|document| (document.value().size(), document.value().nesting())) .unwrap_or((0, 0)); if size > max_size { max_size = size; biggest_document_id = Some(document_id); } if nesting > max_nesting { max_nesting = nesting; most_nested_document_id = Some(document_id); } } if let Some(biggest_document_id) = biggest_document_id && let Some(most_nested_document_id) = most_nested_document_id { Some(BiggestDocumentWrites { max_size: ((*biggest_document_id).into(), max_size), max_nesting: ((*most_nested_document_id).into(), max_nesting), }) } else { None } } pub fn execution_size(&self) -> FunctionExecutionSize { FunctionExecutionSize { num_intervals: self.reads.num_intervals(), read_size: self.reads.user_tx_size().to_owned(), write_size: self.writes.user_size().to_owned(), scheduled_size: self.scheduled_size.clone(), } } /// Applies the reads and writes from FunctionRunner to the Transaction. #[fastrace::trace] pub fn apply_function_runner_tx( &mut self, begin_timestamp: Timestamp, reads: ReadSet, num_intervals: usize, user_tx_size: crate::reads::TransactionReadSize, system_tx_size: crate::reads::TransactionReadSize, updates: OrdMap<ResolvedDocumentId, DocumentUpdateWithPrevTs>, rows_read_by_tablet: BTreeMap<TabletId, u64>, ) -> anyhow::Result<()> { anyhow::ensure!( *self.begin_timestamp() == begin_timestamp, "Timestamp mismatch" ); self.reads .merge(reads, num_intervals, user_tx_size, system_tx_size); self.merge_writes(updates)?; for (tablet_id, rows_read) in rows_read_by_tablet { self.stats.entry(tablet_id).or_default().rows_read += rows_read; } Ok(()) } // Checks that if this transaction already has some writes, they are included // in the given `updates`. This means the passed in `updates` are a superset // of the existing `updates` on this transaction, and that the merged-in // writes cannot modify documents already written to in this transaction. // In most scenarios this transaction will have no writes. pub fn merge_writes( &mut self, updates: OrdMap<ResolvedDocumentId, DocumentUpdateWithPrevTs>, ) -> anyhow::Result<()> { let existing_updates = self.writes().as_flat()?.clone().into_updates(); let mut updates = updates.into_iter().collect::<Vec<_>>(); updates.sort_by_key(|(id, update)| { table_dependency_sort_key( self.bootstrap_tables(), (*id).into(), update.new_document.as_ref(), ) }); let mut preserved_update_count = 0; for (id, update) in updates { // Ensure that the existing update matches, and that // that the merged-in writes didn't otherwise modify documents // already written to in this transaction. if let Some(existing_update) = existing_updates.get(&id) { anyhow::ensure!( *existing_update == update, "Conflicting updates for document {id}" ); preserved_update_count += 1; continue; } if let Some(ref document) = update.new_document { let doc_creation_time = document.creation_time(); if doc_creation_time >= self.next_creation_time { self.next_creation_time = doc_creation_time; self.next_creation_time.increment()?; } } self.apply_validated_write( id, update .old_document .map(|(d, ts)| (d, WriteTimestamp::Committed(ts))), update.new_document, )?; } assert_eq!( preserved_update_count, existing_updates.len(), "Existing write was not preserved" ); Ok(()) } /// Return the document with the given `id` or None if document doesn't /// exist. pub async fn get( &mut self, id: ResolvedDocumentId, ) -> anyhow::Result<Option<ResolvedDocument>> { Ok(self.get_with_ts(id).await?.map(|(doc, _)| doc)) } #[fastrace::trace] #[convex_macro::instrument_future] pub async fn get_with_ts( &mut self, id: ResolvedDocumentId, ) -> anyhow::Result<Option<(ResolvedDocument, WriteTimestamp)>> { task::consume_budget().await; let table_name = match self.table_mapping().tablet_name(id.tablet_id) { Ok(t) => t, Err(_) => return Ok(None), }; if self.virtual_system_mapping().is_virtual_table(&table_name) { anyhow::bail!("Virtual tables should use UserFacingModel::get_with_ts"); } self.get_inner(id, table_name).await } #[convex_macro::instrument_future] pub(crate) async fn patch_inner( &mut self, id: ResolvedDocumentId, value: PatchValue, ) -> anyhow::Result<ResolvedDocument> { task::consume_budget().await; let table_name = self.table_mapping().tablet_name(id.tablet_id)?; let namespace = self.table_mapping().tablet_namespace(id.tablet_id)?; let (old_document, old_ts) = self.get_inner(id, table_name.clone()) .await? .context(ErrorMetadata::bad_request( "NonexistentDocument", format!("Update on nonexistent document ID {id}"), ))?; let new_document = { let patched_value = value .clone() .apply(old_document.value().clone().into_value())?; old_document.replace_value(patched_value)? }; SchemaModel::new(self, namespace) .enforce(&new_document) .await?; self.apply_validated_write(id, Some((old_document, old_ts)), Some(new_document.clone()))?; Ok(new_document) } pub fn is_system(&mut self, namespace: TableNamespace, table_number: TableNumber) -> bool { let tablet_id = self.table_mapping().namespace(namespace).number_to_tablet()(table_number).ok(); tablet_id.is_some_and(|id| self.table_mapping().is_system_tablet(id)) } #[convex_macro::instrument_future] pub(crate) async fn replace_inner( &mut self, id: ResolvedDocumentId, value: ConvexObject, ) -> anyhow::Result<ResolvedDocument> { task::consume_budget().await; let table_name = self.table_mapping().tablet_name(id.tablet_id)?; let namespace = self.table_mapping().tablet_namespace(id.tablet_id)?; let (old_document, old_ts) = self.get_inner(id, table_name) .await? .context(ErrorMetadata::bad_request( "NonexistentDocument", format!("Replace on nonexistent document ID {id}"), ))?; // Replace document. let new_document = old_document.replace_value(value)?; SchemaModel::new(self, namespace) .enforce(&new_document) .await?; self.apply_validated_write( new_document.id(), Some((old_document, old_ts)), Some(new_document.clone()), )?; Ok(new_document) } #[convex_macro::instrument_future] pub async fn delete_inner( &mut self, id: ResolvedDocumentId, ) -> anyhow::Result<ResolvedDocument> { task::consume_budget().await; let table_name = self.table_mapping().tablet_name(id.tablet_id)?; let (document, ts) = self.get_inner(id, table_name) .await? .context(ErrorMetadata::bad_request( "NonexistentDocument", format!("Delete on nonexistent document ID {id}"), ))?; self.apply_validated_write(document.id(), Some((document.clone(), ts)), None)?; Ok(document) } #[fastrace::trace] #[convex_macro::instrument_future] pub async fn count( &mut self, namespace: TableNamespace, table: &TableName, ) -> anyhow::Result<Option<u64>> { let virtual_system_mapping = self.virtual_system_mapping().clone(); let system_table = if virtual_system_mapping.is_virtual_table(table) { virtual_system_mapping.virtual_to_system_table(table)? } else { table }; TableModel::new(self).count(namespace, system_table).await } #[fastrace::trace] #[convex_macro::instrument_future] pub async fn must_count( &mut self, namespace: TableNamespace, table: &TableName, ) -> anyhow::Result<u64> { TableModel::new(self) .count(namespace, table) .await? .ok_or_else(|| { table_summary_bootstrapping_error(Some( "Table count unavailable while bootstrapping", )) }) } pub fn into_token(self) -> anyhow::Result<Token> { if !self.is_readonly() { anyhow::bail!("Transaction isn't readonly"); } metrics::log_read_tx(&self); let ts = *self.begin_timestamp(); Ok(Token::new(Arc::new(self.reads.into_read_set()), ts)) } pub fn take_stats(&mut self) -> BTreeMap<TableName, TableStats> { let stats = mem::take(&mut self.stats); stats .into_iter() .map(|(tablet, stats)| { ( match self.table_mapping().tablet_name(tablet) { Ok(name) => name, Err(_) => { // This is unusual, but possible if the tablet was created in a // subtransaction that was rolled back. Such a tablet never gets // created, but might still have usage stats. tracing::warn!("Tablet {tablet} does not exist"); // It's fine to return "_unknown" here because nothing requires // these to correspond to an actual table. // // We use "_unknown" to avoid colliding with valid user table names. "_unknown" .parse() .expect("'_unknown' should be a valid table name") }, }, stats, ) }) .collect() } pub fn stats_by_tablet(&self) -> &BTreeMap<TabletId, TableStats> { &self.stats } fn take_table_mapping_dep(&mut self) { let tables_by_id = TabletIndexName::by_id( self.metadata .table_mapping() .namespace(TableNamespace::Global) .id(&TABLES_TABLE) .expect("_tables should exist") .tablet_id, ); self.reads .record_indexed_derived(tables_by_id, IndexedFields::by_id(), Interval::all()); } /// Reads the schema from the cache, and records a read dependency. /// Used by SchemaModel. pub(crate) fn get_schema_by_state( &mut self, namespace: TableNamespace, state: SchemaState, ) -> anyhow::Result<Option<(ResolvedDocumentId, Arc<DatabaseSchema>)>> { if !self .table_mapping() .namespace(namespace) .name_exists(&SCHEMAS_TABLE) { return Ok(None); } let schema_tablet = self .table_mapping() .namespace(namespace) .id(&SCHEMAS_TABLE)? .tablet_id; self.schema_registry .get_by_state(namespace, state, schema_tablet, &mut self.reads) } pub fn get_component_path(&mut self, component_id: ComponentId) -> Option<ComponentPath> { self.component_registry .get_component_path(component_id, &mut self.reads) } pub fn must_component_path( &mut self, component_id: ComponentId, ) -> anyhow::Result<ComponentPath> { self.component_registry .must_component_path(component_id, &mut self.reads) } /// Get the component path for a document ID. This might be None when table /// namespaces for new components are created in `start_push`, but /// components have not yet been created. pub fn component_path_for_document_id( &mut self, id: ResolvedDocumentId, ) -> anyhow::Result<Option<ComponentPath>> { self.component_registry.component_path_from_document_id( self.metadata.table_mapping(), id, &mut self.reads, ) } // XXX move to table model? #[cfg(any(test, feature = "testing"))] pub async fn create_system_table_testing( &mut self, namespace: TableNamespace, table_name: &TableName, default_table_number: Option<TableNumber>, ) -> anyhow::Result<bool> { self.create_system_table(namespace, table_name, default_table_number) .await } async fn table_number_for_system_table( &mut self, namespace: TableNamespace, table_name: &TableName, default_table_number: Option<TableNumber>, ) -> anyhow::Result<TableNumber> { Ok(if let Some(default_table_number) = default_table_number { let existing_name = self.table_mapping().namespace(namespace).number_to_name()(default_table_number) .ok(); let table_number = if let Some(existing_name) = existing_name { if self.virtual_system_mapping.is_virtual_table(table_name) && *self .virtual_system_mapping .virtual_to_system_table(table_name)? == existing_name { // Table number is occupied by the system table for the virtual table we're // creating. Allow it. default_table_number } else if cfg!(any(test, feature = "testing")) { // In tests, have a hard failure on conflicting default table numbers. In // real system, have a looser fallback where we pick // another table number. anyhow::bail!( "{default_table_number} is used by both {table_name} and {existing_name}" ); } else { // If the table_number requested is taken, just pick a higher table number. // This might be true for older backends that have lower-numbered system // tables. TableModel::new(self) .next_system_table_number(namespace) .await? } } else { default_table_number }; // TODO(CX-6699) handle system table number exhaustion. anyhow::ensure!( table_number < TableNumber::try_from(NUM_RESERVED_SYSTEM_TABLE_NUMBERS)?, "{table_number} picked for system table {table_name} is reserved for user tables" ); anyhow::ensure!( table_number >= TableNumber::try_from(NUM_RESERVED_LEGACY_TABLE_NUMBERS)?, "{table_number} picked for system table {table_name} is reserved for legacy tables" ); table_number } else { TableModel::new(self) .next_system_table_number(namespace) .await? }) } /// Creates a new system table, with _id and _creationTime indexes, returns /// false if table already existed pub async fn create_system_table( &mut self, namespace: TableNamespace, table_name: &TableName, default_table_number: Option<TableNumber>, ) -> anyhow::Result<bool> { anyhow::ensure!(self.identity().is_system()); anyhow::ensure!( table_name.is_system(), "{table_name:?} is not a valid system table name!" ); let is_new = !TableModel::new(self).table_exists(namespace, table_name); if is_new { let table_number = self .table_number_for_system_table(namespace, table_name, default_table_number) .await?; let metadata = TableMetadata::new(namespace, table_name.clone(), table_number); let table_doc_id = SystemMetadataModel::new_global(self) .insert(&TABLES_TABLE, metadata.try_into()?) .await?; let tablet_id = TabletId(table_doc_id.internal_id()); let by_id_index = IndexMetadata::new_enabled( GenericIndexName::by_id(tablet_id), IndexedFields::by_id(), ); SystemMetadataModel::new_global(self) .insert(&INDEX_TABLE, by_id_index.try_into()?) .await?; let metadata = IndexMetadata::new_enabled( GenericIndexName::by_creation_time(tablet_id), IndexedFields::creation_time(), ); SystemMetadataModel::new_global(self) .insert(&INDEX_TABLE, metadata.try_into()?) .await?; tracing::info!("Created system table: {table_name}"); } else { tracing::debug!("Skipped creating system table {table_name} since it already exists"); }; Ok(is_new) } } // Private methods for `Transaction`: Place all authorization checks closer to // the public interface. impl<RT: Runtime> Transaction<RT> { pub(crate) async fn get_inner( &mut self, id: ResolvedDocumentId, table_name: TableName, ) -> anyhow::Result<Option<(ResolvedDocument, WriteTimestamp)>> { let index_name = TabletIndexName::by_id(id.tablet_id); let printable_index_name = IndexName::by_id(table_name.clone()); let index_key = IndexKey::new(vec![], id.into()); let interval = Interval::prefix(index_key.to_bytes().into()); let range_request = RangeRequest { index_name: index_name.clone(), printable_index_name, interval: interval.clone(), order: Order::Asc, // Request 2 to best-effort verify uniqueness of by_id index. max_size: 2, }; let [result] = self .index .range_batch(&[&range_request]) .await .try_into() .map_err(|_| anyhow::anyhow!("expected result"))?; self.reads .record_indexed_directly(index_name, IndexedFields::by_id(), interval)?; let IndexRangeResponse { page: range_results, cursor, } = result?; if range_results.len() > 1 { Err(anyhow::anyhow!("Got multiple values for id {id:?}"))?; } if !matches!(cursor, CursorPosition::End) { Err(anyhow::anyhow!( "Querying 2 items for a single id didn't exhaust interval for {id:?}" ))?; } let result = match range_results.into_iter().next() { Some((_, doc, timestamp)) => { let is_virtual_table = self.virtual_system_mapping().is_virtual_table(&table_name); let component_path = self .component_path_for_document_id(doc.id())? .unwrap_or_default(); self.reads.record_read_document( component_path, table_name, doc.size(), &self.usage_tracker, is_virtual_table, )?; Some((doc, timestamp)) }, None => None, }; self.stats.entry(id.tablet_id).or_default().rows_read += 1; Ok(result) } /// Apply a validated write to the [Transaction], updating the /// [IndexRegistry] and [TableRegistry]. Validated means the write /// has already been checked for schema enforcement. pub(crate) fn apply_validated_write( &mut self, id: ResolvedDocumentId, old_document_and_ts: Option<(ResolvedDocument, WriteTimestamp)>, new_document: Option<ResolvedDocument>, ) -> anyhow::Result<()> { // Implement something like two-phase commit between the index and the document // store. We first guarantee that the changes are valid for the index and // metadata and then let inserting into writes the commit // point so that the Transaction is never in an inconsistent state. let is_system_document = self.table_mapping().is_system_tablet(id.tablet_id); let bootstrap_tables = self.bootstrap_tables(); let old_document = old_document_and_ts.as_ref().map(|(doc, _)| doc); let index_update = self .index .begin_update(old_document.cloned(), new_document.clone())?; let schema_update = self.schema_registry.begin_update( self.metadata.table_mapping(), id, old_document, new_document.as_ref(), )?; let component_update = self.component_registry.begin_update( self.metadata.table_mapping(), id, old_document, new_document.as_ref(), )?; let metadata_update = self.metadata.begin_update( index_update.registry(), id, old_document.map(|d| d.value().deref()), new_document.as_ref().map(|d| d.value().deref()), )?; let stats = self.stats.entry(id.tablet_id).or_default(); let mut delta = 0; match (old_document, new_document.as_ref()) { (None, None) => { stats.rows_deleted += 1; stats.rows_created += 1; }, (Some(_previous), None) => { // Delete stats.rows_deleted += 1; delta = -1; }, (old, Some(_new)) => { // Insert or replace if old.is_none() { // Insert stats.rows_created += 1; delta = 1; } }, }; // NB: Writes::update is fallible, so be sure to call it before applying the // index and metadata updates. self.writes.update( bootstrap_tables, is_system_document, &mut self.reads, id, old_document_and_ts, new_document, )?; stats.rows_written += 1; index_update.apply(); metadata_update.apply(); schema_update.apply(); component_update.apply(); *self.table_count_deltas.entry(id.tablet_id).or_default() += delta; Ok(()) } pub(crate) async fn insert_document( &mut self, document: ResolvedDocument, ) -> anyhow::Result<ResolvedDocumentId> { let document_id = document.id(); let namespace = self .table_mapping() .tablet_namespace(document_id.tablet_id)?; SchemaModel::new(self, namespace).enforce(&document).await?; self.apply_validated_write(document_id, None, Some(document))?; Ok(document_id) } pub async fn search( &mut self, stable_index_name: &StableIndexName, search: &Search, version: SearchVersion, ) -> anyhow::Result<Vec<(CandidateRevision, IndexKeyBytes)>> { let Some(tablet_index_name) = stable_index_name.tablet_index_name() else { return Ok(vec![]); }; let search = search.clone().to_internal(tablet_index_name.clone())?; self.index .search(&mut self.reads, &search, tablet_index_name.clone(), version) .await } // TODO(lee) Make this private. // We ideally want the transaction to call this internally so caller doesn't // have to call this. However, this is currently hard since the query layer // doesn't persist a stream. pub fn record_read_document( &mut self, document: &ResolvedDocument, table_name: &TableName, ) -> anyhow::Result<()> { let is_virtual_table = self.virtual_system_mapping().is_virtual_table(table_name); let component_path = self .component_path_for_document_id(document.id())? .unwrap_or_default(); self.reads.record_read_document( component_path, table_name.clone(), document.size(), &self.usage_tracker, is_virtual_table, ) } // Preload an index range against the transaction, building a snapshot of // all its current values for future use. Preloading has a few limitations: // // - It doesn't reflect subsequent updates to the index. // - It currently only supports unique database indexes with a single indexed // field. // - It loads the entirety of the index into memory. // - The preloaded snapshot only permits point queries on the index key. // pub async fn preload_index_range( &mut self, namespace: TableNamespace, index_name: &IndexName, interval: &Interval, ) -> anyhow::Result<PreloadedIndexRange> { let stable_index_name = IndexModel::new(self).stable_index_name( namespace, index_name, TableFilter::IncludePrivateSystemTables, )?; let StableIndexName::Physical(tablet_index_name) = stable_index_name else { anyhow::bail!( "Can only preload ranges on physical tables. Failed for index: {index_name} with \ {stable_index_name:?}" ); }; self.index .preload_index_range(&mut self.reads, &tablet_index_name, index_name, interval) .await } #[cfg(any(test, feature = "testing"))] pub fn set_index_size_hard_limit(&mut self, size: usize) { self.index_size_override = Some(size); } pub async fn finalize( self, snapshot_reader: Reader<SnapshotManager>, ) -> anyhow::Result<FinalTransaction> { FinalTransaction::new(self, snapshot_reader).await } } #[derive(Debug)] pub struct IndexRangeRequest { pub stable_index_name: StableIndexName, pub interval: Interval, pub order: Order, pub max_rows: usize, pub version: Option<Version>, } /// FinalTransaction is a finalized Transaction. /// After all persistence reads have been performed and validated, and all /// writes have been staged, a FinalTransaction stores the transaction until it /// has been fully committed. pub struct FinalTransaction { pub(crate) begin_timestamp: RepeatableTimestamp, pub(crate) table_mapping: TableMapping, pub(crate) component_registry: ComponentRegistry, pub(crate) reads: TransactionReadSet, pub(crate) writes: Writes, pub(crate) usage_tracker: FunctionUsageTracker, } impl FinalTransaction { pub fn is_readonly(&self) -> bool { self.writes.is_empty() } pub async fn new<RT: Runtime>( mut transaction: Transaction<RT>, snapshot_reader: Reader<SnapshotManager>, ) -> anyhow::Result<Self> { // All subtransactions must have committed or rolled back. transaction.require_not_nested()?; let begin_timestamp = transaction.begin_timestamp(); let table_mapping = transaction.table_mapping().clone(); let component_registry = transaction.component_registry.deref().clone(); // Note that we do a best effort validation for memory index sizes. We // use the latest snapshot instead of the transaction base snapshot. This // is both more accurate and also avoids pedant hitting transient errors. let latest_snapshot = snapshot_reader.lock().latest_snapshot(); Self::validate_memory_index_sizes(&table_mapping, &transaction, &latest_snapshot)?; Ok(Self { begin_timestamp, table_mapping, component_registry, reads: transaction.reads, writes: transaction.writes.into_flat()?, usage_tracker: transaction.usage_tracker.clone(), }) } fn validate_memory_index_sizes<RT: Runtime>( table_mapping: &TableMapping, transaction: &Transaction<RT>, base_snapshot: &Snapshot, ) -> anyhow::Result<()> { #[allow(unused_mut)] let mut vector_size_limit = *VECTOR_INDEX_SIZE_HARD_LIMIT; #[cfg(any(test, feature = "testing"))] if let Some(size) = transaction.index_size_override { vector_size_limit = size; } #[allow(unused_mut)] let mut search_size_limit = *TEXT_INDEX_SIZE_HARD_LIMIT; #[cfg(any(test, feature = "testing"))] if let Some(size) = transaction.index_size_override { search_size_limit = size; } let modified_tables: BTreeSet<_> = transaction .writes .as_flat()? .coalesced_writes() .map(|(id, _)| id.tablet_id) .collect(); Self::validate_memory_index_size( table_mapping, base_snapshot, &modified_tables, base_snapshot.text_indexes.in_memory_sizes().into_iter(), search_size_limit, SearchType::Text, )?; Self::validate_memory_index_size( table_mapping, base_snapshot, &modified_tables, base_snapshot.vector_indexes.in_memory_sizes().into_iter(), vector_size_limit, SearchType::Vector, )?; Ok(()) } fn validate_memory_index_size( table_mapping: &TableMapping, base_snapshot: &Snapshot, modified_tables: &BTreeSet<TabletId>, iterator: impl Iterator<Item = (IndexId, usize)>, hard_limit: usize, index_type: SearchType, ) -> anyhow::Result<()> { for (index_id, size) in iterator { if size < hard_limit { continue; } // Note that we are getting an index by name without adding any read dependency. // This is fine since we are only decided whether to let the transaction // through or not. If we do not, we will throw a non-JS error. let index = base_snapshot .index_registry .enabled_index_by_index_id(&index_id) .cloned() .with_context(|| anyhow::anyhow!("failed to find index id {index_id}"))? .name(); if !modified_tables.contains(index.table()) { // NOTE: All operation make the in memory size larger, including // deletes and updates with smaller values. So we reject any // modification if we are over the limit. continue; } tracing::error!( "Index size for index_id {index_id} is {size}, limit for {} is {hard_limit}", index_type.as_ref() ); log_index_too_large_blocking_writes(index_type); anyhow::bail!(ErrorMetadata::overloaded( format!("{}IndexTooLarge", index_type.as_ref()), format!( "Too many writes to {}. Spread your writes out over time or throttle them to avoid errors. If you’re importing data into a new application, consider removing the index and adding it again after the import (you can re-add the index as a staged index to avoid blocking your pushes: https://docs.convex.dev/database/reading-data/indexes/#staged-indexes).", index.map_table(&table_mapping.tablet_to_name())? ) )) } Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server