Skip to main content
Glama
mod.rs62 kB
//! Bulk import of external data into Convex. //! //! The core import logic lives in //! [SnapshotImportExecutor::attempt_perform_import]. The flow is roughly: //! - [SnapshotImportExecutor::parse_import] -> [parse_import_file] creates a //! [ParsedImport] from the import file. //! - For a zip file, this walks the zip directory and creates a lazy //! [ImportDocumentStream] for each table (found in //! [ParsedImport::documents]). This works because zip files are seekable. //! - Other import formats resolve to just one table. //! - At the same time, we save a copy of the schemas from the database. //! - [import_objects] copies data from the [ParsedImport] into the database, //! writing into hidden tables (except in [ImportMode::Append]). //! - During a multi-table import, we [assign_table_numbers] and create hidden //! tables via [prepare_table_for_import] before writing any data. This //! requires reading the `_tables` tables first (if present) to find table //! numbers; we also attempt to guess table numbers by reading `_id` fields //! (see [table_number_for_import]). //! - Schemas are validated using a simulated version of the final table //! mapping. //! - [finalize_import] deletes tables that are being replaced and promotes //! hidden tables to active, finishing the import. This does more schema //! checks (see [schema_constraints]), and additionally checks again for table //! number uniqueness, which could fail if there were racing table mapping //! changes. See [TableModel::activate_tables]. use std::{ collections::{ btree_map::Entry, BTreeMap, BTreeSet, HashSet, }, pin::Pin, sync::Arc, time::Duration, }; use anyhow::Context; use bytes::Bytes; use common::{ bootstrap_model::{ schema::SchemaState, tables::TABLES_TABLE, }, comparators::tuple::two::TupleKey, components::{ ComponentId, ComponentPath, }, document::{ CreationTime, ParsedDocument, ID_FIELD, }, errors::report_error, execution_context::ExecutionId, knobs::{ MAX_IMPORT_AGE, TRANSACTION_MAX_NUM_USER_WRITES, TRANSACTION_MAX_USER_WRITE_SIZE_BYTES, }, runtime::{ assert_send, Runtime, }, types::{ FullyQualifiedObjectKey, MemberId, TableName, UdfIdentifier, }, RequestId, }; use database::{ BootstrapComponentsModel, Database, ImportFacingModel, IndexModel, SchemaModel, TableModel, Transaction, }; use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; use file_storage::FileStorage; use futures::{ stream::{ self, BoxStream, Peekable, }, Stream, StreamExt, TryStreamExt, }; use keybroker::Identity; use model::{ deployment_audit_log::{ types::DeploymentAuditLogEvent, DeploymentAuditLogModel, }, file_storage::{ FILE_STORAGE_TABLE, FILE_STORAGE_VIRTUAL_TABLE, }, snapshot_imports::{ types::{ ImportFormat, ImportMode, ImportRequestor, ImportState, SnapshotImport, }, SnapshotImportModel, }, }; use serde_json::Value as JsonValue; use shape_inference::{ export_context::GeneratedSchema, ProdConfig, }; use storage::Storage; use sync_types::{ backoff::Backoff, Timestamp, }; use thousands::Separable; use usage_tracking::{ CallType, FunctionUsageTracker, StorageCallTracker, UsageCounter, }; use value::{ id_v6::DeveloperDocumentId, ConvexObject, ConvexValue, IdentifierFieldName, ResolvedDocumentId, Size, TableMapping, TableNamespace, TableNumber, TabletId, TabletIdAndTableNumber, }; use crate::{ snapshot_import::{ audit_log::make_audit_log_event, confirmation::info_message_for_import, import_error::{ wrap_import_err, ImportError, }, import_file_storage::import_storage_table, metrics::log_snapshot_import_age, parse::{ parse_import_file, ImportDocumentStream, ImportStorageFileStream, ParsedImport, }, prepare_component::prepare_component_for_import, progress::{ add_checkpoint_message, best_effort_update_progress_message, }, schema_constraints::{ schemas_for_import, ImportSchemaConstraints, SchemasForImport, }, }, Application, }; mod audit_log; mod confirmation; mod import_error; mod import_file_storage; mod metrics; mod parse; mod prepare_component; mod progress; mod schema_constraints; mod table_change; #[cfg(test)] mod tests; mod worker; pub use worker::SnapshotImportWorker; // NB: This is a bandaid. In general, we want to retry forever on system // failures, because all system failures should be transient. If we have // nontransient system errors, those are bugs and we should fix them. However, // while we are in the process, use this as a bandaid to limit the damage. Once // nontransient system errors are fixed, we can remove this. const SNAPSHOT_IMPORT_MAX_SYSTEM_FAILURES: u32 = 5; struct SnapshotImportExecutor<RT: Runtime> { runtime: RT, database: Database<RT>, snapshot_imports_storage: Arc<dyn Storage>, file_storage: FileStorage<RT>, usage_tracking: UsageCounter, backoff: Backoff, } impl<RT: Runtime> SnapshotImportExecutor<RT> { async fn handle_uploaded_state( &self, snapshot_import: ParsedDocument<SnapshotImport>, ) -> anyhow::Result<()> { anyhow::ensure!(snapshot_import.state == ImportState::Uploaded); tracing::info!("Marking snapshot import as WaitingForConfirmation"); let import_id = snapshot_import.id(); match info_message_for_import(self, snapshot_import).await { Ok((info_message, require_manual_confirmation, new_checkpoints)) => { self.database .execute_with_overloaded_retries( Identity::system(), FunctionUsageTracker::new(), "snapshot_import_waiting_for_confirmation", |tx| { async { let mut import_model = SnapshotImportModel::new(tx); import_model .mark_waiting_for_confirmation( import_id, info_message.clone(), require_manual_confirmation, new_checkpoints.clone(), ) .await?; Ok(()) } .into() }, ) .await?; }, Err(e) => { let mut e = wrap_import_err(e); if e.is_bad_request() || self.backoff.failures() >= SNAPSHOT_IMPORT_MAX_SYSTEM_FAILURES { report_error(&mut e).await; self.database .execute_with_overloaded_retries( Identity::system(), FunctionUsageTracker::new(), "snapshot_import_fail", |tx| { async { let mut import_model = SnapshotImportModel::new(tx); import_model .fail_import(import_id, e.user_facing_message()) .await?; Ok(()) } .into() }, ) .await?; } else { anyhow::bail!(e); } }, } Ok(()) } async fn handle_in_progress_state( &mut self, snapshot_import: ParsedDocument<SnapshotImport>, ) -> anyhow::Result<()> { anyhow::ensure!(matches!( snapshot_import.state, ImportState::InProgress { .. } )); let import_id = snapshot_import.id(); match self.attempt_perform_import(snapshot_import).await { Ok((ts, num_rows_written)) => { self.database .execute_with_overloaded_retries( Identity::system(), FunctionUsageTracker::new(), "snapshop_import_complete", |tx| { async { let mut import_model = SnapshotImportModel::new(tx); import_model .complete_import(import_id, ts, num_rows_written) .await?; Ok(()) } .into() }, ) .await?; }, Err(e) => { let mut e = wrap_import_err(e); if e.is_bad_request() || self.backoff.failures() >= SNAPSHOT_IMPORT_MAX_SYSTEM_FAILURES { report_error(&mut e).await; self.database .execute_with_overloaded_retries( Identity::system(), FunctionUsageTracker::new(), "snapshot_import_fail", |tx| { async { let mut import_model = SnapshotImportModel::new(tx); import_model .fail_import(import_id, e.user_facing_message()) .await?; Ok(()) } .into() }, ) .await?; } else { anyhow::bail!(e); } }, } Ok(()) } fn fail_if_too_old( &self, snapshot_import: &ParsedDocument<SnapshotImport>, ) -> anyhow::Result<()> { let creation_time = snapshot_import.creation_time(); let now = CreationTime::try_from(*self.database.now_ts_for_reads())?; let age = Duration::from_millis((f64::from(now) - f64::from(creation_time)) as u64); log_snapshot_import_age(age); tracing::info!( "SnapshotImport attempt of {} starting ({:?}) after its creation.", snapshot_import.id(), age ); if age > *MAX_IMPORT_AGE { anyhow::bail!(ErrorMetadata::bad_request( "ImportFailed", "Import took too long. Try again or contact Convex." )); } Ok(()) } async fn attempt_perform_import( &mut self, snapshot_import: ParsedDocument<SnapshotImport>, ) -> anyhow::Result<(Timestamp, u64)> { self.fail_if_too_old(&snapshot_import)?; let (initial_schemas, import) = self.parse_import(snapshot_import.id()).await?; let usage = FunctionUsageTracker::new(); let (imported_tables, total_documents_imported) = import_objects( &self.database, &self.file_storage, Identity::system(), &initial_schemas, snapshot_import.mode, import, usage.clone(), Some(snapshot_import.id()), snapshot_import.requestor.clone(), ) .await?; let object_attributes = (match &snapshot_import.object_key { Ok(key) => { self.snapshot_imports_storage .get_fq_object_attributes(key) .await }, Err(key) => { self.snapshot_imports_storage .get_object_attributes(key) .await }, })? .context("error getting export object attributes from S3")?; // Charge file bandwidth for the download of the snapshot from imports storage usage .track_storage_egress_size( ComponentPath::root(), snapshot_import.requestor.usage_tag().to_string(), object_attributes.size, ) .await; let pause_client = self.runtime.pause_client(); pause_client.wait("before_finalize_import").await; let (id, snapshot_import) = snapshot_import.into_id_and_value(); let (ts, _documents_deleted) = finalize_import( &self.database, &self.usage_tracking, Identity::system(), snapshot_import.member_id, initial_schemas, snapshot_import.mode, imported_tables, usage, AuditLogInfo::SnapshotImport { import_format: snapshot_import.format, }, Some(id), snapshot_import.requestor.clone(), ) .await?; Ok((ts, total_documents_imported)) } async fn parse_import( &self, import_id: ResolvedDocumentId, ) -> anyhow::Result<(SchemasForImport, ParsedImport)> { let SnapshotImport { object_key, format, component_path, .. } = { let mut tx = self.database.begin(Identity::system()).await?; let mut model = SnapshotImportModel::new(&mut tx); model .get(import_id) .await? .context("import not found")? .into_value() }; let fq_key = match object_key { Ok(key) => key, Err(key) => self.snapshot_imports_storage.fully_qualified_key(&key), }; let import = parse_import_file( format.clone(), component_path.clone(), self.snapshot_imports_storage.clone(), fq_key, ) .await?; let component_id = prepare_component_for_import(&self.database, &component_path).await?; // Remapping could be more extensive here, it's just relatively simple to handle // optional types. We do remapping after parsing rather than during parsing // because it seems expensive to read the data for and parse all objects inside // of a transaction, though I haven't explicitly tested the performance. let mut tx = self.database.begin(Identity::system()).await?; let initial_schemas = schemas_for_import(&mut tx).await?; let import = match format { ImportFormat::Csv(table_name) => { remap_empty_string_by_schema( TableNamespace::from(component_id), table_name, &mut tx, import, ) .await? }, _ => import, }; drop(tx); Ok((initial_schemas, import)) } } pub async fn start_stored_import<RT: Runtime>( application: &Application<RT>, identity: Identity, format: ImportFormat, mode: ImportMode, component_path: ComponentPath, fq_object_key: FullyQualifiedObjectKey, requestor: ImportRequestor, ) -> anyhow::Result<DeveloperDocumentId> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(ImportError::Unauthorized); } let (_, id, _) = application .database .execute_with_overloaded_retries( identity, FunctionUsageTracker::new(), "snapshot_import_store_uploaded", |tx| { async { let mut model = SnapshotImportModel::new(tx); model .start_import( format.clone(), mode, component_path.clone(), fq_object_key.clone(), requestor.clone(), ) .await } .into() }, ) .await?; Ok(id.into()) } pub async fn perform_import<RT: Runtime>( application: &Application<RT>, identity: Identity, import_id: DeveloperDocumentId, ) -> anyhow::Result<()> { if !identity.is_admin() { anyhow::bail!(ImportError::Unauthorized); } application .database .execute_with_overloaded_retries( identity, FunctionUsageTracker::new(), "snapshot_import_perform", |tx| { async { let import_id = tx.resolve_developer_id(&import_id, TableNamespace::Global)?; let mut import_model = SnapshotImportModel::new(tx); import_model.confirm_import(import_id).await?; Ok(()) } .into() }, ) .await?; Ok(()) } pub async fn cancel_import<RT: Runtime>( application: &Application<RT>, identity: Identity, import_id: DeveloperDocumentId, ) -> anyhow::Result<()> { if !identity.is_admin() { anyhow::bail!(ImportError::Unauthorized); } application .database .execute_with_overloaded_retries( identity, FunctionUsageTracker::new(), "snapshot_import_cancel", |tx| { async { let import_id = tx.resolve_developer_id(&import_id, TableNamespace::Global)?; let mut import_model = SnapshotImportModel::new(tx); import_model.cancel_import(import_id).await?; Ok(()) } .into() }, ) .await?; Ok(()) } async fn wait_for_import_worker<RT: Runtime>( application: &Application<RT>, identity: Identity, import_id: DeveloperDocumentId, ) -> anyhow::Result<ParsedDocument<SnapshotImport>> { let snapshot_import = loop { let mut tx = application.begin(identity.clone()).await?; let import_id = tx.resolve_developer_id(&import_id, TableNamespace::Global)?; let mut import_model = SnapshotImportModel::new(&mut tx); let snapshot_import = import_model .get(import_id) .await? .context(ErrorMetadata::not_found( "ImportNotFound", format!("import {import_id} not found"), ))?; match &snapshot_import.state { ImportState::Uploaded | ImportState::InProgress { .. } => { let token = tx.into_token()?; application .subscribe(token) .await? .wait_for_invalidation() .await; }, ImportState::WaitingForConfirmation { .. } | ImportState::Completed { .. } | ImportState::Failed(..) => { break snapshot_import; }, } }; Ok(snapshot_import) } pub async fn do_import<RT: Runtime>( application: &Application<RT>, identity: Identity, format: ImportFormat, mode: ImportMode, component_path: ComponentPath, body_stream: BoxStream<'_, anyhow::Result<Bytes>>, ) -> anyhow::Result<u64> { let object_key = application.upload_snapshot_import(body_stream).await?; do_import_from_object_key( application, identity, format, mode, component_path, object_key, ) .await } pub async fn do_import_from_object_key<RT: Runtime>( application: &Application<RT>, identity: Identity, format: ImportFormat, mode: ImportMode, component_path: ComponentPath, export_object_key: FullyQualifiedObjectKey, ) -> anyhow::Result<u64> { let import_id = start_stored_import( application, identity.clone(), format, mode, component_path, export_object_key, ImportRequestor::SnapshotImport, ) .await?; let snapshot_import = wait_for_import_worker(application, identity.clone(), import_id).await?; match &snapshot_import.state { ImportState::Uploaded | ImportState::InProgress { .. } | ImportState::Completed { .. } => { anyhow::bail!("should be WaitingForConfirmation, is {snapshot_import:?}") }, ImportState::WaitingForConfirmation { .. } => {}, ImportState::Failed(e) => { anyhow::bail!(ErrorMetadata::bad_request("ImportFailed", e.to_string())) }, } perform_import(application, identity.clone(), import_id).await?; let snapshot_import = wait_for_import_worker(application, identity.clone(), import_id).await?; match &snapshot_import.state { ImportState::Uploaded | ImportState::WaitingForConfirmation { .. } | ImportState::InProgress { .. } => { anyhow::bail!("should be done, is {snapshot_import:?}") }, ImportState::Completed { ts: _, num_rows_written, } => Ok(*num_rows_written as u64), ImportState::Failed(e) => { anyhow::bail!(ErrorMetadata::bad_request("ImportFailed", e.to_string())) }, } } /// Clears tables atomically. /// Returns number of documents deleted. /// This is implemented as an import of empty tables in Replace mode. /// /// N.B.: This will create components and tables if they don't exist. pub async fn clear_tables<RT: Runtime>( application: &Application<RT>, identity: &Identity, table_names: Vec<(ComponentPath, TableName)>, ) -> anyhow::Result<u64> { let usage = FunctionUsageTracker::new(); let (initial_schemas, original_table_mapping) = { let mut tx = application.begin(identity.clone()).await?; ( schemas_for_import(&mut tx).await?, tx.table_mapping().clone(), ) }; let mut table_mapping = TableMapping::new(); for (component_path, table_name) in table_names { let component_id = prepare_component_for_import(&application.database, &component_path).await?; let table_number = original_table_mapping .namespace(component_id.into()) .id_and_number_if_exists(&table_name) .map(|t| t.table_number); let table_id = create_empty_table( &application.database, identity, component_id, &table_name, table_number, None, /* import_id */ &table_name, &component_path, ) .await?; table_mapping.insert( table_id.tablet_id, component_id.into(), table_id.table_number, table_name, ); } let (_ts, documents_deleted) = finalize_import( &application.database, &application.usage_tracking, identity.clone(), None, initial_schemas, ImportMode::Replace, table_mapping, usage, AuditLogInfo::ClearTables, None, ImportRequestor::SnapshotImport, ) .await?; Ok(documents_deleted) } /// Reads all objects from a [`ParsedImport`] and writes them into the database. /// Returns a table mapping containing just the imported tables. If `mode` is /// ReplaceAll this includes empty tables for tables that should be cleared. async fn import_objects<RT: Runtime>( database: &Database<RT>, file_storage: &FileStorage<RT>, identity: Identity, initial_schemas: &SchemasForImport, mode: ImportMode, import: ParsedImport, usage: FunctionUsageTracker, import_id: Option<ResolvedDocumentId>, requestor: ImportRequestor, ) -> anyhow::Result<(TableMapping, u64)> { let mut generated_schemas: BTreeMap<_, _> = import .generated_schemas .into_iter() .map(|(component_path, table_name, generated_schema)| { ((component_path, table_name), generated_schema) }) .collect(); let mut total_num_documents = 0; // In ReplaceAll mode, we want to delete all unaffected user tables // If there's a schema, then we want to clear it instead. let db_snapshot = database.latest_snapshot()?; let original_table_mapping = db_snapshot.table_mapping(); // First make sure all components exist, and find their IDs. let tables: Vec<( ComponentPath, ComponentId, TableName, Peekable<ImportDocumentStream>, )> = stream::iter(import.documents) .then(async |(component_path, mut table_name, objects)| { let component_id = prepare_component_for_import(database, &component_path).await?; // Remap the storage table; this is a bit hacky if table_name == *FILE_STORAGE_VIRTUAL_TABLE { table_name = FILE_STORAGE_TABLE.clone(); } anyhow::Ok((component_path, component_id, table_name, objects.peekable())) }) .try_collect() .await?; let (tables_tables, mut tables) = tables .into_iter() .partition::<Vec<_>, _>(|(_, _, table_name, _)| *table_name == *TABLES_TABLE); database .runtime() .pause_client() .wait("before_assign_table_numbers") .await; // Assign table numbers based on the `_tables` tables and peeking at the // imported objects. let table_name_to_number = assign_table_numbers( database, &mode, tables_tables, &mut tables, original_table_mapping, initial_schemas, ) .await?; // Now prepare all imported tables using the requested table numbers. // This creates empty hidden tables or resumes from a checkpoint. let mut table_mapping_in_import = TableMapping::new(); let mut tablet_id_to_num_to_skip: BTreeMap<TabletId, u64> = BTreeMap::new(); for (&(component_id, ref table_name), &table_number) in &table_name_to_number { let (table_id, num_to_skip) = prepare_table_for_import( database, &identity, mode, component_id, table_name, table_number, import_id, ) .await?; table_mapping_in_import.insert( table_id.tablet_id, component_id.into(), table_id.table_number, table_name.clone(), ); anyhow::ensure!(tablet_id_to_num_to_skip .insert(table_id.tablet_id, num_to_skip) .is_none()); } let table_mapping_for_schema = if let ImportMode::ReplaceAll = mode { table_mapping_in_import.clone() } else { let mut mapping = original_table_mapping.clone(); for &(component_id, ref table_name) in table_name_to_number.keys() { if let Some(table_id) = mapping .namespace(component_id.into()) .id_and_number_if_exists(table_name) { mapping.remove(table_id.tablet_id); } } for (table_id, namespace, table_number, table_name) in table_mapping_in_import.iter() { mapping.insert(table_id, namespace, table_number, table_name.clone()); } mapping }; let mut storage_files_by_component: BTreeMap< ComponentPath, Vec<(DeveloperDocumentId, ImportStorageFileStream)>, > = BTreeMap::new(); for (component_path, id, stream) in import.storage_files { storage_files_by_component .entry(component_path) .or_default() .push((id, stream)); } for (component_path, component_id, table_name, document_stream) in tables { let generated_schema = generated_schemas.get_mut(&(&component_path, &table_name) as &dyn TupleKey<_, _>); let table_id = table_mapping_in_import .namespace(component_id.into()) .id(&table_name)?; total_num_documents += import_single_table( database, file_storage, &identity, &component_path, &table_name, document_stream, &mut storage_files_by_component, generated_schema, &table_mapping_for_schema, table_id, *tablet_id_to_num_to_skip .get(&table_id.tablet_id) .context("missing entry in tablet_id_to_num_to_skip")?, usage.clone(), import_id, requestor.clone(), ) .await?; } Ok((table_mapping_in_import, total_num_documents)) } struct TableMappingForImport { table_mapping_in_import: TableMapping, to_delete: BTreeMap<TabletId, (TableNamespace, TableNumber, TableName)>, } impl TableMappingForImport { fn tables_imported(&self) -> BTreeSet<(TableNamespace, TableName)> { self.table_mapping_in_import .iter() .map(|(_, namespace, _, table_name)| (namespace, table_name.clone())) .collect() } fn tables_deleted(&self) -> BTreeSet<(TableNamespace, TableName)> { self.to_delete .values() .filter(|(namespace, _table_number, table_name)| { !self .table_mapping_in_import .namespace(*namespace) .name_exists(table_name) }) .map(|(namespace, _table_number, table_name)| (*namespace, table_name.clone())) .collect() } } enum AuditLogInfo { ClearTables, SnapshotImport { import_format: ImportFormat }, } async fn finalize_import<RT: Runtime>( database: &Database<RT>, usage_tracking: &UsageCounter, identity: Identity, member_id_override: Option<MemberId>, initial_schemas: SchemasForImport, mode: ImportMode, imported_tables: TableMapping, usage: FunctionUsageTracker, audit_log_info: AuditLogInfo, import_id: Option<ResolvedDocumentId>, requestor: ImportRequestor, ) -> anyhow::Result<(Timestamp, u64)> { // Ensure that schemas will be valid after the tables are activated. // TODO: we should be checking that `initial_schemas` matches the schemas at // time of commit let schema_constraints = ImportSchemaConstraints::new(&imported_tables, initial_schemas); // If we inserted into an existing table, we're done because the table is // now populated and active. // If we inserted into an Hidden table, make it Active. let (ts, documents_deleted, _) = database .execute_with_overloaded_retries( identity, FunctionUsageTracker::new(), "snapshot_import_finalize", |tx| { async { if let Some(import_id) = import_id { // Only finalize the import if it's in progress. let mut snapshot_import_model = SnapshotImportModel::new(tx); let snapshot_import_state = snapshot_import_model.must_get_state(import_id).await?; match snapshot_import_state { ImportState::InProgress { .. } => {}, // This can happen if the import was canceled or somehow retried after // completion. These errors won't show up to // the user because they are already terminal states, // so we won't transition to a new state due to this error. ImportState::Failed(e) => anyhow::bail!("Import failed: {e}"), ImportState::Completed { .. } => { anyhow::bail!("Import already completed") }, // Indicates a bug -- we shouldn't be finalizing an import that hasn't // started yet. ImportState::Uploaded | ImportState::WaitingForConfirmation { .. } => { anyhow::bail!("Import is not in progress") }, } } let to_delete = match mode { ImportMode::Append | ImportMode::Replace | ImportMode::RequireEmpty => { BTreeMap::new() }, ImportMode::ReplaceAll => { let existing_tables = tx.table_mapping().clone(); existing_tables .iter_active_user_tables() .filter(|&(_tablet_id, namespace, _table_number, table_name)| { // Avoid deleting componentless namespaces (created during // start_push). if tx.get_component_path(namespace.into()).is_none() { return false; } // If it was written by the import, don't clear it or delete it. !imported_tables.namespace(namespace).name_exists(table_name) }) .map(|(tablet_id, namespace, table_number, table_name)| { (tablet_id, (namespace, table_number, table_name.clone())) }) .collect() }, }; let table_mapping_for_import = TableMappingForImport { table_mapping_in_import: imported_tables.clone(), to_delete, }; let audit_log_event = match &audit_log_info { AuditLogInfo::ClearTables => DeploymentAuditLogEvent::ClearTables, AuditLogInfo::SnapshotImport { import_format } => { make_audit_log_event( tx, &table_mapping_for_import, mode, import_format.clone(), requestor.clone(), ) .await? }, }; let mut documents_deleted = 0; for tablet_id in table_mapping_for_import.to_delete.keys() { let namespace = tx.table_mapping().tablet_namespace(*tablet_id)?; let table_name = tx.table_mapping().tablet_name(*tablet_id)?; let mut table_model = TableModel::new(tx); documents_deleted += table_model .count(namespace, &table_name) .await? .unwrap_or(0); tracing::info!( "finalize_import({import_id:?}) Deleting table {table_name} in \ namespace {namespace:?}" ); table_model .delete_active_table(namespace, table_name) .await?; } schema_constraints.validate(tx).await?; let mut table_model = TableModel::new(tx); documents_deleted += assert_send(table_model.activate_tables( table_mapping_for_import.table_mapping_in_import.iter().map( |(tablet_id, namespace, _table_number, table_name)| { tracing::info!( "finalize_import({import_id:?}) Activating table {table_name} \ in namespace {namespace:?}" ); tablet_id }, ), )) .await?; DeploymentAuditLogModel::new(tx) .insert_with_member_override( vec![audit_log_event.clone()], member_id_override, ) .await?; Ok(documents_deleted) } .into() }, ) .await?; let tag = requestor.usage_tag().to_string(); let call_type = match requestor { ImportRequestor::SnapshotImport => CallType::Import, ImportRequestor::CloudRestore { .. } => CallType::CloudRestore, }; // Charge database bandwidth accumulated during the import usage_tracking .track_call( UdfIdentifier::SystemJob(tag), ExecutionId::new(), RequestId::new(), call_type, true, usage.gather_user_stats(), ) .await; Ok((ts, documents_deleted)) } /// Assign table numbers. There are numerous constraints: /// - table numbers must not conflict after the import is finalized /// - table numbers encoded in _id fields should match their tables /// - schema validation for v.id() columns must pass with the final table /// numbers async fn assign_table_numbers<RT: Runtime>( database: &Database<RT>, mode: &ImportMode, tables_tables: Vec<( ComponentPath, ComponentId, TableName, Peekable<ImportDocumentStream>, )>, tables: &mut Vec<( ComponentPath, ComponentId, TableName, Peekable<ImportDocumentStream>, )>, original_table_mapping: &TableMapping, initial_schemas: &SchemasForImport, ) -> anyhow::Result<BTreeMap<(ComponentId, TableName), Option<TableNumber>>> { let mut table_name_to_number: BTreeMap<(ComponentId, TableName), Option<TableNumber>> = BTreeMap::new(); // None here means that we'll pick any number let mut table_number_to_name: BTreeMap<(ComponentId, TableNumber), TableName> = BTreeMap::new(); let mut assign_number = |component_path: &ComponentPath, component_id: ComponentId, table_name: TableName, table_number: TableNumber| { match table_number_to_name.entry((component_id, table_number)) { Entry::Vacant(v) => v.insert(table_name), Entry::Occupied(o) => anyhow::bail!(ErrorMetadata::bad_request( "InvalidId", format!( "conflict between `{}` and `{}`{} with number {table_number}", o.get(), table_name, component_path.in_component_str(), ) )), }; anyhow::Ok(()) }; // Step 1: Read _tables if present in the import. If we're importing an // untouched snapshot export this will assign every table a proper number. for (component_path, component_id, _, objects) in tables_tables { let mut stream = parse_tables_table(objects); while let Some((table_name, table_number)) = stream.try_next().await? { anyhow::ensure!( table_name_to_number .insert((component_id, table_name.clone()), Some(table_number)) .is_none(), ErrorMetadata::bad_request( "DuplicateTableName", format!( "`_tables` contains duplicate entries for `{table_name}`{}", component_path.in_component_str() ) ) ); assign_number(&component_path, component_id, table_name, table_number)?; } } // Step 2: For tables that aren't listed in `_tables`, read their first // object's _id field (if present) to infer a table number to assign that // table. for (component_path, component_id, table_name, objects) in tables.iter_mut() { let Entry::Vacant(v) = table_name_to_number.entry((*component_id, table_name.clone())) else { continue; }; if let Some(table_number) = table_number_for_import(objects).await { v.insert(Some(table_number)); // If this creates a table number conflict with step 1, raise an // error right away, since the documents will otherwise fail to // insert. assign_number( component_path, *component_id, table_name.clone(), table_number, )?; } } // Step 3: For tables that don't have _id fields, reuse their table number // from the current table mapping, if possible. // Otherwise, they will get a fresh table number (`None` in the map). let mut assign_existing_table_number = |component_id: ComponentId, table_name: &TableName| { if let Entry::Vacant(v) = table_name_to_number.entry((component_id, table_name.clone())) { let number = if let Some(existing) = original_table_mapping .namespace(component_id.into()) .id_and_number_if_exists(table_name) && let Entry::Vacant(v2) = table_number_to_name.entry((component_id, existing.table_number)) { v2.insert(table_name.clone()); Some(existing.table_number) } else { None }; v.insert(number); } }; for (_, component_id, table_name, _) in tables.iter() { assign_existing_table_number(*component_id, table_name); } // In ReplaceAll mode, any tables that aren't imported are going to be // deleted during finalization. For tables in the schema, we should replace // them with empty tables instead. // TODO: this is racy, as the schemas could change before finalization. if let ImportMode::ReplaceAll = mode { for (namespace, schema_state, (_, schema)) in initial_schemas { let component_id = ComponentId::from(*namespace); if let SchemaState::Active = schema_state { for table_name in schema.tables.keys() { assign_existing_table_number(component_id, table_name); } } } } else { // Proactively check for table number conflicts before inserting any // objects. This check is not authoritative, as the table mapping may // change again before the import is finalized. // // This isn't relevant for ReplaceAll mode since all original tables // will be deleted. for (tablet_id, namespace, table_number, table_name) in original_table_mapping.iter() { let component_id = ComponentId::from(namespace); if original_table_mapping.is_active(tablet_id) && !table_name_to_number .contains_key(&(&component_id, table_name) as &dyn TupleKey<_, _>) && let Some(imported_table_name) = table_number_to_name.get(&(component_id, table_number)) { let mut tx = database.begin_system().await?; let component_path = BootstrapComponentsModel::new(&mut tx).get_component_path(component_id); anyhow::bail!(TableModel::<RT>::table_conflict_error( tx.virtual_system_mapping(), &component_path.unwrap_or_default(), imported_table_name, table_name )); } } } Ok(table_name_to_number) } fn parse_tables_table( objects: impl Stream<Item = anyhow::Result<JsonValue>> + Unpin, ) -> impl Stream<Item = anyhow::Result<(TableName, TableNumber)>> + Unpin { let mut lineno = 0; objects.map(move |r| { let exported_value = r?; lineno += 1; let exported_object = exported_value .as_object() .with_context(|| ImportError::NotAnObject(lineno))?; let table_name = exported_object .get("name") .and_then(|name| name.as_str()) .with_context(|| { ImportError::InvalidConvexValue(lineno, anyhow::anyhow!("table requires name")) })?; let table_name = table_name .parse() .map_err(|e| ImportError::InvalidName(table_name.to_string(), e))?; let table_number = exported_object .get("id") .and_then(|id| id.as_f64()) .and_then(|id| TableNumber::try_from(id as u32).ok()) .with_context(|| { ImportError::InvalidConvexValue( lineno, anyhow::anyhow!( "table requires id (received {:?})", exported_object.get("id") ), ) })?; Ok((table_name, table_number)) }) } async fn import_single_table<RT: Runtime>( database: &Database<RT>, file_storage: &FileStorage<RT>, identity: &Identity, component_path: &ComponentPath, table_name: &TableName, mut objects: Peekable<ImportDocumentStream>, storage_files_by_component: &mut BTreeMap< ComponentPath, Vec<(DeveloperDocumentId, ImportStorageFileStream)>, >, mut generated_schema: Option<&mut GeneratedSchema<ProdConfig>>, table_mapping_for_schema: &TableMapping, table_id: TabletIdAndTableNumber, num_to_skip: u64, usage: FunctionUsageTracker, import_id: Option<ResolvedDocumentId>, requestor: ImportRequestor, ) -> anyhow::Result<u64> { if let Some(import_id) = import_id { best_effort_update_progress_message( database, identity, import_id, format!( "Importing \"{}\"{}", table_name, component_path.in_component_str() ), component_path, table_name, 0, ) .await; } anyhow::ensure!(*table_name != *TABLES_TABLE); if *table_name == *FILE_STORAGE_TABLE { let storage_files = storage_files_by_component .remove(component_path) .unwrap_or_default(); import_storage_table( database, file_storage, identity, table_id, component_path, objects, storage_files, &usage, import_id, num_to_skip, requestor, table_mapping_for_schema, ) .await?; return Ok(0); } let mut num_objects = 0; let mut objects_to_insert = vec![]; let mut objects_to_insert_size = 0; while let Some(exported_value) = objects.try_next().await? { if num_objects < num_to_skip { num_objects += 1; continue; } let row_number = num_objects + 1; let convex_value = GeneratedSchema::<ProdConfig>::apply(generated_schema.as_deref_mut(), exported_value) .map_err(|e| ImportError::InvalidConvexValue(row_number, e))?; let ConvexValue::Object(convex_object) = convex_value else { anyhow::bail!(ImportError::NotAnObject(row_number)); }; objects_to_insert_size += convex_object.size(); objects_to_insert.push(convex_object); if objects_to_insert_size > *TRANSACTION_MAX_USER_WRITE_SIZE_BYTES / 2 || objects_to_insert.len() > *TRANSACTION_MAX_NUM_USER_WRITES / 2 { insert_import_objects( database, identity, objects_to_insert, table_name, table_id, table_mapping_for_schema, usage.clone(), ) .await?; objects_to_insert = Vec::new(); objects_to_insert_size = 0; if let Some(import_id) = import_id { best_effort_update_progress_message( database, identity, import_id, format!( "Importing \"{table_name}\" ({} documents)", num_objects.separate_with_commas() ), component_path, table_name, num_objects as i64, ) .await; } } num_objects += 1; } insert_import_objects( database, identity, objects_to_insert, table_name, table_id, table_mapping_for_schema, usage, ) .await?; if let Some(import_id) = import_id { add_checkpoint_message( database, identity, import_id, format!( "Imported \"{table_name}\"{} ({} documents)", component_path.in_component_str(), num_objects.separate_with_commas() ), component_path, table_name, num_objects as i64, ) .await?; } Ok(num_objects) } async fn insert_import_objects<RT: Runtime>( database: &Database<RT>, identity: &Identity, objects_to_insert: Vec<ConvexObject>, table_name: &TableName, table_id: TabletIdAndTableNumber, table_mapping_for_schema: &TableMapping, usage: FunctionUsageTracker, ) -> anyhow::Result<()> { if objects_to_insert.is_empty() { return Ok(()); } let object_ids: Vec<_> = objects_to_insert .iter() .filter_map(|object| object.get(&**ID_FIELD)) .collect(); let object_ids_dedup: BTreeSet<_> = object_ids.iter().collect(); if object_ids_dedup.len() < object_ids.len() { anyhow::bail!(ErrorMetadata::bad_request( "DuplicateId", format!("Objects in table \"{table_name}\" have duplicate _id fields") )); } database .execute_with_overloaded_retries( identity.clone(), usage, "snapshot_import_insert_objects", |tx| { async { for object_to_insert in objects_to_insert.clone() { ImportFacingModel::new(tx) .insert( table_id, table_name, object_to_insert, table_mapping_for_schema, ) .await?; } Ok(()) } .into() }, ) .await?; Ok(()) } async fn prepare_table_for_import<RT: Runtime>( database: &Database<RT>, identity: &Identity, mode: ImportMode, component_id: ComponentId, table_name: &TableName, table_number: Option<TableNumber>, import_id: Option<ResolvedDocumentId>, ) -> anyhow::Result<(TabletIdAndTableNumber, u64)> { anyhow::ensure!( table_name == &*FILE_STORAGE_TABLE || !table_name.is_system(), ErrorMetadata::bad_request( "InvalidTableName", format!("Invalid table name {table_name} starts with metadata prefix '_'") ) ); let display_table_name = if table_name == &*FILE_STORAGE_TABLE { &*FILE_STORAGE_VIRTUAL_TABLE } else { table_name }; let mut tx = database.begin(identity.clone()).await?; let component_path = BootstrapComponentsModel::new(&mut tx) .get_component_path(component_id) .context("component disappeared")?; let existing_checkpoint = match import_id { Some(import_id) => { SnapshotImportModel::new(&mut tx) .get_table_checkpoint(import_id, &component_path, display_table_name) .await? }, None => None, }; let existing_checkpoint_tablet = existing_checkpoint .as_ref() .and_then(|checkpoint| checkpoint.tablet_id); let (insert_into_existing_table_id, num_to_skip) = match existing_checkpoint_tablet { Some(tablet_id) => { if let ImportMode::Append = mode { // TODO: resuming an append from checkpoint isn't possible // without a data model change (writing a cursor transactionally // with the written documents) anyhow::bail!("can't resume append import"); } let existing_table_number = tx.table_mapping().tablet_number(tablet_id)?; let num_to_skip = TableModel::new(&mut tx) .must_count_tablet(tablet_id) .await?; ( Some(TabletIdAndTableNumber { tablet_id, table_number: existing_table_number, }), num_to_skip, ) }, None => { let tablet_id = match mode { ImportMode::Append => tx .table_mapping() .namespace(component_id.into()) .id_and_number_if_exists(table_name), ImportMode::RequireEmpty => { if TableModel::new(&mut tx) .must_count(component_id.into(), table_name) .await? != 0 { anyhow::bail!(ImportError::TableExists(table_name.clone())); } None }, ImportMode::Replace | ImportMode::ReplaceAll => None, }; (tablet_id, 0) }, }; drop(tx); let table_id = if let Some(insert_into_existing_table_id) = insert_into_existing_table_id { insert_into_existing_table_id } else { create_empty_table( database, identity, component_id, table_name, table_number, import_id, display_table_name, &component_path, ) .await? }; if let Some(requested_table_number) = table_number { // This should only happen for ImportMode::Append anyhow::ensure!( requested_table_number == table_id.table_number, ErrorMetadata::bad_request( "TableNumberConflict", format!( "table {table_name}{component} wants table number {requested_table_number} \ but was already assigned {actual_table_number}", component = component_path.in_component_str(), actual_table_number = table_id.table_number, ) ) ); } Ok((table_id, num_to_skip)) } async fn create_empty_table<RT: Runtime>( database: &Database<RT>, identity: &Identity, component_id: ComponentId, table_name: &TableName, table_number: Option<TableNumber>, import_id: Option<ResolvedDocumentId>, display_table_name: &TableName, component_path: &ComponentPath, ) -> anyhow::Result<TabletIdAndTableNumber> { let (_, table_id, _) = database .execute_with_overloaded_retries( identity.clone(), FunctionUsageTracker::new(), "snapshot_import_prepare_table", |tx| { async { // Create a new table in state Hidden, that will later be changed to Active. let table_id = TableModel::new(tx) .insert_table_for_import(component_id.into(), table_name, table_number) .await?; IndexModel::new(tx) .copy_indexes_to_table(component_id.into(), table_name, table_id.tablet_id) .await?; if let Some(import_id) = import_id { SnapshotImportModel::new(tx) .checkpoint_tablet_created( import_id, component_path, display_table_name, table_id.tablet_id, ) .await?; } Ok(table_id) } .into() }, ) .await?; backfill_and_enable_indexes_on_table(database, identity, table_id.tablet_id).await?; Ok(table_id) } /// Waits for all indexes on a table to be backfilled, which may take a while /// for large tables. After the indexes are backfilled, enable them. async fn backfill_and_enable_indexes_on_table<RT: Runtime>( database: &Database<RT>, identity: &Identity, tablet_id: TabletId, ) -> anyhow::Result<()> { loop { let mut tx = database.begin(identity.clone()).await?; let still_backfilling = IndexModel::new(&mut tx) .all_indexes_on_table(tablet_id) .await? .into_iter() .any(|index| index.config.is_backfilling()); if !still_backfilling { break; } let token = tx.into_token()?; let subscription = database.subscribe(token).await?; subscription.wait_for_invalidation().await; } // Enable the indexes now that they are backfilled. database .execute_with_overloaded_retries( identity.clone(), FunctionUsageTracker::new(), "snapshot_import_enable_indexes", |tx| { async { let mut index_model = IndexModel::new(tx); let mut backfilled_indexes = vec![]; for index in index_model.all_indexes_on_table(tablet_id).await? { if !index.config.is_enabled() { backfilled_indexes.push(index); } } index_model .enable_backfilled_indexes(backfilled_indexes) .await?; Ok(()) } .into() }, ) .await?; Ok(()) } async fn table_number_for_import( objects: &mut Peekable<ImportDocumentStream>, ) -> Option<TableNumber> { let first_object = Pin::new(objects).peek().await?.as_ref().ok()?; let object = first_object.as_object()?; let first_id = object.get(&**ID_FIELD)?; let JsonValue::String(id) = first_id else { return None; }; let id_v6 = DeveloperDocumentId::decode(id).ok()?; Some(id_v6.table()) } async fn remap_empty_string_by_schema<RT: Runtime>( namespace: TableNamespace, table_name: TableName, tx: &mut Transaction<RT>, mut import: ParsedImport, ) -> anyhow::Result<ParsedImport> { if let Some((_, schema)) = SchemaModel::new(tx, namespace) .get_by_state(SchemaState::Active) .await? { let document_schema = match schema .tables .get(&table_name) .and_then(|table_schema| table_schema.document_type.clone()) { None => return Ok(import), Some(document_schema) => document_schema, }; let optional_fields = document_schema.optional_top_level_fields(); if optional_fields.is_empty() { return Ok(import); } import.documents = import .documents .into_iter() .map(move |(component, table, stream)| { let optional_fields = optional_fields.clone(); ( component, table, stream .map_ok(move |mut object| { remove_empty_string_optional_entries(&optional_fields, &mut object); object }) .boxed(), ) }) .collect(); } Ok(import) } fn remove_empty_string_optional_entries( optional_fields: &HashSet<IdentifierFieldName>, object: &mut JsonValue, ) { let Some(object) = object.as_object_mut() else { return; }; object.retain(|field_name, value| { // Remove optional fields that have an empty string as their value. let Ok(identifier_field_name) = field_name.parse::<IdentifierFieldName>() else { return true; }; if !optional_fields.contains(&identifier_field_name) { return true; } let &mut JsonValue::String(ref s) = value else { return true; }; !s.is_empty() }); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server