Skip to main content
Glama

Convex MCP server

Official
by get-convex
committer.rs54.3 kB
use std::{ cmp, collections::BTreeSet, ops::Bound, sync::Arc, time::Duration, }; use ::metrics::{ StatusTimer, Timer, }; use anyhow::Context as _; use common::{ backoff::Backoff, bootstrap_model::tables::{ TableMetadata, TableState, TABLES_TABLE, }, components::{ ComponentId, ComponentPath, }, document::{ DocumentUpdateWithPrevTs, ParseDocument, ParsedDocument, ResolvedDocument, }, errors::{ recapture_stacktrace, report_error, DatabaseTimeoutError, }, fastrace_helpers::{ initialize_root_from_parent, EncodedSpan, }, knobs::{ COMMITTER_QUEUE_SIZE, COMMIT_TRACE_THRESHOLD, MAX_REPEATABLE_TIMESTAMP_COMMIT_DELAY, MAX_REPEATABLE_TIMESTAMP_IDLE_FREQUENCY, TRANSACTION_WARN_READ_SET_INTERVALS, }, persistence::{ ConflictStrategy, DocumentLogEntry, Persistence, PersistenceGlobalKey, PersistenceIndexEntry, PersistenceReader, RepeatablePersistence, RetentionValidator, TimestampRange, }, runtime::{ block_in_place, tokio_spawn, Runtime, SpawnHandle, }, shutdown::ShutdownSignal, sync::split_rw_lock::{ Reader, Writer, }, types::{ DatabaseIndexUpdate, DatabaseIndexValue, RepeatableTimestamp, Timestamp, WriteTimestamp, }, value::ResolvedDocumentId, }; use errors::ErrorMetadata; use fastrace::prelude::*; use futures::{ future::{ BoxFuture, Either, }, select_biased, stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt, }; use indexing::index_registry::IndexRegistry; use itertools::Itertools; use parking_lot::Mutex; use prometheus::VMHistogram; use rand::Rng; use tokio::sync::{ mpsc::{ self, error::TrySendError, }, oneshot, }; use tokio_util::task::AbortOnDropHandle; use usage_tracking::FunctionUsageTracker; use value::{ heap_size::{ HeapSize, WithHeapSize, }, id_v6::DeveloperDocumentId, InternalDocumentId, TableMapping, TableName, }; use vector::DocInVectorIndex; use crate::{ bootstrap_model::defaults::BootstrapTableIds, database::ConflictingReadWithWriteSource, metrics::{ self, bootstrap_update_timer, finish_bootstrap_update, next_commit_ts_seconds, table_summary_finish_bootstrap_timer, }, reads::ReadSet, search_index_bootstrap::{ stream_revision_pairs_for_indexes, BootstrappedSearchIndexes, }, snapshot_manager::SnapshotManager, table_summary::{ self, }, transaction::FinalTransaction, write_log::{ index_keys_from_full_documents, LogWriter, PackedDocumentUpdate, PendingWriteHandle, PendingWrites, WriteSource, }, writes::DocumentWrite, ComponentRegistry, Snapshot, Transaction, TransactionReadSet, }; const INITIAL_PERSISTENCE_WRITES_BACKOFF: Duration = Duration::from_secs(1); const MAX_PERSISTENCE_WRITES_BACKOFF: Duration = Duration::from_secs(60); enum PersistenceWrite { Commit { pending_write: PendingWriteHandle, commit_timer: StatusTimer, result: oneshot::Sender<anyhow::Result<Timestamp>>, parent_trace: EncodedSpan, commit_id: usize, }, MaxRepeatableTimestamp { new_max_repeatable: Timestamp, timer: Timer<VMHistogram>, result: oneshot::Sender<Timestamp>, commit_id: usize, }, } impl PersistenceWrite { fn commit_id(&self) -> usize { match self { Self::Commit { commit_id, .. } => *commit_id, Self::MaxRepeatableTimestamp { commit_id, .. } => *commit_id, } } } pub const AFTER_PENDING_WRITE_SNAPSHOT: &str = "after_pending_write_snapshot"; pub struct Committer<RT: Runtime> { // Internal staged commits for conflict checking. pending_writes: PendingWrites, // External log of writes for subscriptions. log: LogWriter, snapshot_manager: Writer<SnapshotManager>, persistence: Arc<dyn Persistence>, runtime: RT, last_assigned_ts: Timestamp, persistence_writes: FuturesOrdered<BoxFuture<'static, anyhow::Result<PersistenceWrite>>>, retention_validator: Arc<dyn RetentionValidator>, } impl<RT: Runtime> Committer<RT> { pub(crate) fn start( log: LogWriter, snapshot_manager: Writer<SnapshotManager>, persistence: Arc<dyn Persistence>, runtime: RT, retention_validator: Arc<dyn RetentionValidator>, shutdown: ShutdownSignal, ) -> CommitterClient { let persistence_reader = persistence.reader(); let conflict_checker = PendingWrites::new(persistence_reader.version()); let (tx, rx) = mpsc::channel(*COMMITTER_QUEUE_SIZE); let snapshot_reader = snapshot_manager.reader(); let committer = Self { pending_writes: conflict_checker, log, snapshot_manager, persistence, runtime: runtime.clone(), last_assigned_ts: Timestamp::MIN, persistence_writes: FuturesOrdered::new(), retention_validator: retention_validator.clone(), }; let handle = runtime.spawn("committer", async move { if let Err(err) = committer.go(rx).await { // Committer hit a fatal error. This should only happen if a // persistence write fails or in case of unrecoverable logic // errors. shutdown.signal(err); tracing::error!("Shutting down committer"); } }); CommitterClient { handle: Arc::new(Mutex::new(handle)), sender: tx, persistence_reader, retention_validator, snapshot_reader, } } async fn go(mut self, mut rx: mpsc::Receiver<CommitterMessage>) -> anyhow::Result<()> { let mut last_bumped_repeatable_ts = self.runtime.monotonic_now(); // Assume there were commits just before the backend restarted, so first do a // quick bump. // None means a bump is ongoing. Avoid parallel bumps in case they // commit out of order and regress the repeatable timestamp. let mut next_bump_wait = Some(*MAX_REPEATABLE_TIMESTAMP_COMMIT_DELAY); // This span starts with receiving a commit message and ends with that same // commit getting published. It captures all of the committer activity // in between. let mut committer_span = None; // Keep a monotonically increasing id to keep track of honeycomb traces // Each commit_id tracks a single write to persistence, from the time the commit // message is received until the time the commit has been published. We skip // read-only transactions or commits that fail to validate. let mut commit_id = 0; // Keep track of the commit_id that is currently being traced. let mut span_commit_id = None; loop { let bump_fut = if let Some(wait) = &next_bump_wait { Either::Left( self.runtime .wait(wait.saturating_sub(last_bumped_repeatable_ts.elapsed())), ) } else { Either::Right(std::future::pending()) }; select_biased! { _ = bump_fut.fuse() => { let committer_span = committer_span.get_or_insert_with(|| { span_commit_id = Some(commit_id); Span::root("bump_max_repeatable", SpanContext::random()) }); // Advance the repeatable read timestamp so non-leaders can // establish a recent repeatable snapshot. next_bump_wait = None; let (tx, _rx) = oneshot::channel(); self.bump_max_repeatable_ts(tx, commit_id, committer_span); commit_id += 1; last_bumped_repeatable_ts = self.runtime.monotonic_now(); } result = self.persistence_writes.select_next_some() => { let pending_commit = result.context("Write failed. Unsure if transaction committed to disk.")?; let pending_commit_id = pending_commit.commit_id(); match pending_commit { PersistenceWrite::Commit { pending_write, commit_timer, result, parent_trace, .. } => { let parent_span = initialize_root_from_parent("Committer::publish_commit", parent_trace); let publish_commit_span = committer_span.as_ref().map(|root| Span::enter_with_parents("publish_commit", [root, &parent_span])).unwrap_or_else(|| parent_span); let _guard = publish_commit_span.set_local_parent(); let commit_ts = pending_write.must_commit_ts(); self.publish_commit(pending_write); let _ = result.send(Ok(commit_ts)); // When we next get free cycles and there is no ongoing bump, // bump max_repeatable_ts so followers can read this commit. if next_bump_wait.is_some() { next_bump_wait = Some(*MAX_REPEATABLE_TIMESTAMP_COMMIT_DELAY); } commit_timer.finish(); }, PersistenceWrite::MaxRepeatableTimestamp { new_max_repeatable, timer, result, .. } => { let span = committer_span.as_ref().map(|root| Span::enter_with_parent("publish_max_repeatable_ts", root)).unwrap_or_else(Span::noop); span.set_local_parent(); self.publish_max_repeatable_ts(new_max_repeatable)?; let base_period = *MAX_REPEATABLE_TIMESTAMP_IDLE_FREQUENCY; next_bump_wait = Some( self.runtime.rng().random_range(base_period..base_period * 2), ); let _ = result.send(new_max_repeatable); drop(timer); }, } // Report the trace if it is longer than the threshold if let Some(id) = span_commit_id && id == pending_commit_id { if let Some(span) = committer_span.take() { if span.elapsed() < Some(*COMMIT_TRACE_THRESHOLD) { tracing::debug!("Not sending span to honeycomb because it is below the threshold"); span.cancel(); } else { tracing::debug!("Sending trace to honeycomb"); } } } } maybe_message = rx.recv().fuse() => { match maybe_message { None => { tracing::info!("All clients have gone away, shutting down committer..."); return Ok(()); }, Some(CommitterMessage::Commit { queue_timer, transaction, result, write_source, parent_trace, }) => { let parent_span = initialize_root_from_parent("handle_commit_message", parent_trace.clone()) .with_property(|| ("time_in_queue_ms", format!("{}", queue_timer.elapsed().as_secs_f64() * 1000.0))); let committer_span_ref = committer_span.get_or_insert_with(|| { span_commit_id = Some(commit_id); Span::root("commit", SpanContext::random()) }); let start_commit_span = Span::enter_with_parents("start_commit", [committer_span_ref, &parent_span]); let _guard = start_commit_span.set_local_parent(); drop(queue_timer); if let Some(persistence_write_future) = self.start_commit(transaction, result, write_source, parent_trace, commit_id, committer_span_ref) { self.persistence_writes.push_back(persistence_write_future); commit_id += 1; } else if span_commit_id == Some(commit_id) { // If the span_commit_id is the same as the commit_id, that means we created a root span in this block // and it didn't get incremented, so it's not a write to persistence and we should not trace it. // We also need to reset the span_commit_id and committer_span. committer_span_ref.cancel(); committer_span = None; span_commit_id = None; } }, #[cfg(any(test, feature = "testing"))] Some(CommitterMessage::BumpMaxRepeatableTs { result }) => { let span = Span::noop(); self.bump_max_repeatable_ts(result, commit_id, &span); commit_id += 1; }, Some(CommitterMessage::FinishTextAndVectorBootstrap { bootstrapped_indexes, bootstrap_ts, result, }) => { self.finish_search_and_vector_bootstrap( bootstrapped_indexes, bootstrap_ts, result ).await; }, Some(CommitterMessage::FinishTableSummaryBootstrap { result, }) => { self.finish_table_summary_bootstrap(result).await; }, Some(CommitterMessage::LoadIndexesIntoMemory { tables, result }) => { let response = self.load_indexes_into_memory(tables).await; let _ = result.send(response); } } }, } } } async fn update_indexes_since_bootstrap( BootstrappedSearchIndexes { text_index_manager, vector_index_manager, tables_with_indexes, }: &mut BootstrappedSearchIndexes, bootstrap_ts: Timestamp, persistence: RepeatablePersistence, registry: &IndexRegistry, ) -> anyhow::Result<()> { let _timer = bootstrap_update_timer(); anyhow::ensure!( !text_index_manager.is_bootstrapping(), "Trying to update search index while it's still bootstrapping" ); anyhow::ensure!( !vector_index_manager.is_bootstrapping(), "Trying to update vector index while it's still bootstrapping" ); let range = TimestampRange::new((Bound::Excluded(bootstrap_ts), Bound::Unbounded)); let revision_stream = stream_revision_pairs_for_indexes(tables_with_indexes, &persistence, range); futures::pin_mut!(revision_stream); let mut num_revisions = 0; let mut total_size = 0; while let Some(revision_pair) = revision_stream.try_next().await? { num_revisions += 1; total_size += revision_pair.document().map(|d| d.size()).unwrap_or(0); text_index_manager.update( registry, revision_pair.prev_document(), revision_pair.document(), WriteTimestamp::Committed(revision_pair.ts()), )?; vector_index_manager.update( registry, revision_pair.prev_document(), revision_pair.document(), WriteTimestamp::Committed(revision_pair.ts()), )?; } finish_bootstrap_update(num_revisions, total_size); Ok(()) } async fn finish_search_and_vector_bootstrap( &mut self, mut bootstrapped_indexes: BootstrappedSearchIndexes, bootstrap_ts: RepeatableTimestamp, result: oneshot::Sender<anyhow::Result<()>>, ) { let (last_snapshot, latest_ts) = { let snapshot_manager = self.snapshot_manager.read(); ( snapshot_manager.latest_snapshot(), snapshot_manager.latest_ts(), ) }; if latest_ts > bootstrap_ts { let repeatable_persistence = RepeatablePersistence::new( self.persistence.reader(), latest_ts, self.retention_validator.clone(), ); let res = Self::update_indexes_since_bootstrap( &mut bootstrapped_indexes, *bootstrap_ts, repeatable_persistence, &last_snapshot.index_registry, ) .await; if res.is_err() { let _ = result.send(res); return; } } // Committer is currently single threaded, so commits should be blocked until we // finish and the timestamp shouldn't be able to advance. let mut snapshot_manager = self.snapshot_manager.write(); if latest_ts != snapshot_manager.latest_ts() { panic!("Snapshots were changed concurrently during commit?"); } snapshot_manager.overwrite_last_snapshot_text_and_vector_indexes( bootstrapped_indexes.text_index_manager, bootstrapped_indexes.vector_index_manager, &mut self.pending_writes, ); tracing::info!("Committed backfilled vector indexes"); let _ = result.send(Ok(())); } async fn finish_table_summary_bootstrap( &mut self, result: oneshot::Sender<anyhow::Result<()>>, ) { let _timer = table_summary_finish_bootstrap_timer(); let latest_ts = { let snapshot_manager = self.snapshot_manager.read(); snapshot_manager.latest_ts() }; // This gets called by the TableSummaryWorker when it has successfully // checkpointed a TableSummarySnapshot. // Walk any changes since the last checkpoint, and update the snapshot manager // with the new TableSummarySnapshot. let bootstrap_result = table_summary::bootstrap( self.runtime.clone(), self.persistence.reader(), self.retention_validator.clone(), latest_ts, table_summary::BootstrapKind::FromCheckpoint, ) .await; let (table_summary_snapshot, _) = match bootstrap_result { Ok(res) => res, Err(err) => { let _ = result.send(Err(err)); return; }, }; // Committer is currently single threaded, so commits should be blocked until we // finish and the timestamp shouldn't be able to advance. let mut snapshot_manager = self.snapshot_manager.write(); if latest_ts != snapshot_manager.latest_ts() { panic!("Snapshots were changed concurrently during commit?"); } snapshot_manager.overwrite_last_snapshot_table_summary( table_summary_snapshot, &mut self.pending_writes, ); tracing::info!("Bootstrapped table summaries at ts {}", latest_ts); let _ = result.send(Ok(())); } // This blocks the committer and loads the in-memory indexes for the latest // snapshot in memory. A potential further improvement is to pick a base // timestamp and load the indexes at that timestamp outside of the committer. // The committer can then replay recent writes to derive the latest in-memory // indexes. This would either need to do another database query for the log // or rely on the write log to not have trimmed the base timestamp yet. async fn load_indexes_into_memory( &mut self, tables: BTreeSet<TableName>, ) -> anyhow::Result<()> { let (last_snapshot, latest_ts) = { let snapshot_manager = self.snapshot_manager.read(); ( snapshot_manager.latest_snapshot(), snapshot_manager.latest_ts(), ) }; let repeatable_persistence = RepeatablePersistence::new( self.persistence.reader(), latest_ts, self.retention_validator.clone(), ); let mut in_memory_indexes = last_snapshot.in_memory_indexes.clone(); in_memory_indexes .load_enabled_for_tables( &last_snapshot.index_registry, last_snapshot.table_mapping(), &repeatable_persistence.read_snapshot(latest_ts)?, &tables, ) .await?; // Committer is currently single threaded, so commits should be blocked until we // finish and the timestamp shouldn't be able to advance. let mut snapshot_manager = self.snapshot_manager.write(); if latest_ts != snapshot_manager.latest_ts() { panic!("Snapshots were changed concurrently during commit?"); } snapshot_manager .overwrite_last_snapshot_in_memory_indexes(in_memory_indexes, &mut self.pending_writes); tracing::info!("Loaded indexes into memory"); Ok(()) } fn bump_max_repeatable_ts( &mut self, result: oneshot::Sender<Timestamp>, commit_id: usize, root_span: &Span, ) { let timer = metrics::bump_repeatable_ts_timer(); // next_max_repeatable_ts bumps the last_assigned_ts, so all future commits on // this committer will be after new_max_repeatable. let new_max_repeatable = self .next_max_repeatable_ts() .expect("new_max_repeatable should exist"); let persistence = self.persistence.clone(); let span = Span::enter_with_parent("bump_max_repeatable_ts", root_span); let runtime = self.runtime.clone(); self.persistence_writes.push_back( async move { // The MaxRepeatableTimestamp persistence global ensures all future // commits on future leaders will be after new_max_repeatable, and followers // can know this timestamp is repeatable. // If we fail to bump the timestamp, we'll backoff and retry // which will block the committer from making forward progress until we // succceed. We don't want to kill the committer and reload the // instance if we can avoid it, as that would exacerbate any // load-related issues. let mut backoff = Backoff::new( INITIAL_PERSISTENCE_WRITES_BACKOFF, MAX_PERSISTENCE_WRITES_BACKOFF, ); loop { match persistence .write_persistence_global( PersistenceGlobalKey::MaxRepeatableTimestamp, new_max_repeatable.into(), ) .await { Ok(()) => { backoff.reset(); break; }, Err(mut e) => { let delay = backoff.fail(&mut runtime.rng()); report_error(&mut e).await; tracing::error!( "Failed to bump max repeatable timestamp, retrying after {:.2}s", delay.as_secs_f32() ); runtime.wait(delay).await; continue; }, } } Ok(PersistenceWrite::MaxRepeatableTimestamp { new_max_repeatable, timer, result, commit_id, }) } .in_span(span) .boxed(), ); } fn publish_max_repeatable_ts(&mut self, new_max_repeatable: Timestamp) -> anyhow::Result<()> { // Bump the latest snapshot in snapshot_manager so reads on this leader // can know this timestamp is repeatable. let mut snapshot_manager = self.snapshot_manager.write(); if snapshot_manager.bump_persisted_max_repeatable_ts(new_max_repeatable)? { self.log.append( new_max_repeatable, WithHeapSize::default(), "publish_max_repeatable_ts".into(), ); } Ok(()) } /// First, check that it's valid to apply this transaction in-memory. If it /// passes validation, we can rebase the transaction to a new timestamp /// if other transactions have committed. #[fastrace::trace] fn validate_commit( &mut self, transaction: FinalTransaction, write_source: WriteSource, ) -> anyhow::Result<ValidatedCommit> { let commit_ts = self.next_commit_ts()?; let timer = metrics::commit_is_stale_timer(); if let Some(conflicting_read) = self.commit_has_conflict( transaction.reads.read_set(), *transaction.begin_timestamp, commit_ts, )? { anyhow::bail!(conflicting_read.into_error(&transaction.table_mapping, &write_source)); } timer.finish(); let updates: Vec<_> = transaction.writes.coalesced_writes().collect(); // The updates are ordered using table_dependency_sort_key, // which is the same order they should be applied to database metadata // and index data structures let mut ordered_updates = updates; ordered_updates.sort_by_key(|(id, update)| { table_dependency_sort_key( BootstrapTableIds::new(&transaction.table_mapping), InternalDocumentId::from(**id), update.new_document.as_ref(), ) }); let (document_writes, index_writes, snapshot) = self.compute_writes(commit_ts, &ordered_updates)?; // Append the updates to pending_writes, so future conflicting commits // will fail the `commit_has_conflict` check above, even before // this transaction writes to persistence or is visible to reads. Note that // this can cause theoretical false conflicts, where transaction has a conflict // with another one, and the latter never ended up committing. This // should be very rare, and false positives are acceptable by design. let timer = metrics::pending_writes_append_timer(); let pending_write = self.pending_writes.push_back( commit_ts, ordered_updates .into_iter() .map(|(&id, update)| (id, PackedDocumentUpdate::pack(update))) .collect(), write_source, snapshot, ); drop(timer); Ok(ValidatedCommit { index_writes, document_writes, pending_write, }) } #[fastrace::trace] fn compute_writes( &self, commit_ts: Timestamp, ordered_updates: &Vec<(&ResolvedDocumentId, &DocumentUpdateWithPrevTs)>, ) -> anyhow::Result<( Vec<ValidatedDocumentWrite>, BTreeSet<(Timestamp, DatabaseIndexUpdate)>, Snapshot, )> { let timer = metrics::commit_prepare_writes_timer(); let mut document_writes = Vec::new(); let mut index_writes = Vec::new(); // We have to compute the new snapshot from the latest pending snapshot in case // there are pending writes that need to be included. We have already // checked for conflicts, so the latest pending snapshot must // have the same tables and indexes as the base snapshot and the final // publishing snapshot. Therefore index writes can be computed from the // latest pending snapshot. let mut latest_pending_snapshot = self .pending_writes .latest_snapshot() .unwrap_or_else(|| self.snapshot_manager.read().latest_snapshot()); for &(id, document_update) in ordered_updates.iter() { let (updates, doc_in_vector_index) = latest_pending_snapshot.update(document_update, commit_ts)?; index_writes.extend(updates); document_writes.push(ValidatedDocumentWrite { commit_ts, id: (*id).into(), write: DocumentWrite { document: document_update.new_document.clone(), }, doc_in_vector_index, prev_ts: document_update.old_document.as_ref().map(|&(_, ts)| ts), }); } let index_writes = index_writes .into_iter() .map(|index_update| (commit_ts, index_update)) .collect(); timer.finish(); Ok((document_writes, index_writes, latest_pending_snapshot)) } #[fastrace::trace] fn commit_has_conflict( &self, reads: &ReadSet, reads_ts: Timestamp, commit_ts: Timestamp, ) -> anyhow::Result<Option<ConflictingReadWithWriteSource>> { if let Some(conflicting_read) = self.log.is_stale(reads, reads_ts, commit_ts)? { return Ok(Some(conflicting_read)); } if let Some(conflicting_read) = self.pending_writes.is_stale(reads, reads_ts, commit_ts)? { return Ok(Some(conflicting_read)); } Ok(None) } /// Commit the transaction to persistence (without the lock held). /// This is the commit point of a transaction. If this succeeds, the /// transaction must be published and made visible. If we are unsure whether /// the write went through, we crash the process and recover from whatever /// has been written to persistence. async fn write_to_persistence( persistence: Arc<dyn Persistence>, index_writes: Arc<Vec<PersistenceIndexEntry>>, document_writes: Arc<Vec<DocumentLogEntry>>, ) -> anyhow::Result<()> { let timer = metrics::commit_persistence_write_timer(); persistence .write( document_writes.as_slice(), &index_writes, ConflictStrategy::Error, ) .await .context("Commit failed to write to persistence")?; timer.finish(); Ok(()) } /// After writing the new rows to persistence, mark the commit as complete /// and allow the updated rows to be read by other transactions. #[fastrace::trace] fn publish_commit(&mut self, pending_write: PendingWriteHandle) { let apply_timer = metrics::commit_apply_timer(); let commit_ts = pending_write.must_commit_ts(); let (ordered_updates, write_source, new_snapshot) = match self.pending_writes.pop_first(pending_write) { None => panic!("commit at {commit_ts} not pending"), Some((ts, document_updates, write_source, snapshot)) => { if ts != commit_ts { panic!("commits out of order {ts} != {commit_ts}"); } (document_updates, write_source, snapshot) }, }; // Write transaction state at the commit ts to the document store. metrics::commit_rows(ordered_updates.len() as u64); let timer = metrics::pending_writes_to_write_log_timer(); // See the comment in `overlaps_index_keys` for why it’s safe // to use indexes from the current snapshot. let writes = index_keys_from_full_documents(ordered_updates, &new_snapshot.index_registry); let size = writes.heap_size(); drop(timer); metrics::write_log_commit_bytes(size); let timer = metrics::write_log_append_timer(); self.log.append(commit_ts, writes, write_source); drop(timer); if let Some(table_summaries) = new_snapshot.table_summaries.as_ref() { metrics::log_num_keys(table_summaries.num_user_documents); metrics::log_document_store_size(table_summaries.user_size); } // Publish the new version of our database metadata and the index. let mut snapshot_manager = self.snapshot_manager.write(); snapshot_manager.push(commit_ts, new_snapshot); apply_timer.finish(); } #[fastrace::trace] /// Returns a future to add to the pending_writes queue, if the commit /// should be written. fn start_commit( &mut self, transaction: FinalTransaction, result: oneshot::Sender<anyhow::Result<Timestamp>>, write_source: WriteSource, parent_trace: EncodedSpan, commit_id: usize, root_span: &Span, ) -> Option<BoxFuture<'static, anyhow::Result<PersistenceWrite>>> { // Skip read-only transactions. if transaction.is_readonly() { let _ = result.send(Ok(*transaction.begin_timestamp)); return None; } let commit_timer = metrics::commit_timer(); metrics::log_write_tx(&transaction); // Trace if the transaction has a lot of read intervals. if transaction.reads.num_intervals() > *TRANSACTION_WARN_READ_SET_INTERVALS { tracing::warn!( "Transaction with write source {write_source:?} has {} read intervals: {}", transaction.reads.num_intervals(), transaction.reads.top_three_intervals() ); } let table_mapping = transaction.table_mapping.clone(); let component_registry = transaction.component_registry.clone(); let usage_tracking = transaction.usage_tracker.clone(); let ValidatedCommit { index_writes, document_writes, pending_write, } = match block_in_place(|| self.validate_commit(transaction, write_source)) { Ok(v) => v, Err(e) => { let _ = result.send(Err(e)); return None; }, }; // necessary because this value is moved let parent_trace_copy = parent_trace.clone(); let persistence = self.persistence.clone(); let request_span = initialize_root_from_parent("Committer::persistence_writes_future", parent_trace); let outer_span = Span::enter_with_parents("outer_write_commit", [root_span, &request_span]); let pause_client = self.runtime.pause_client(); let rt = self.runtime.clone(); Some( async move { Self::track_commit( usage_tracking, &index_writes, &document_writes, &table_mapping, &component_registry, ); let mut backoff = Backoff::new( INITIAL_PERSISTENCE_WRITES_BACKOFF, MAX_PERSISTENCE_WRITES_BACKOFF, ); let document_writes = Arc::new( document_writes .into_iter() .map(|write| DocumentLogEntry { ts: write.commit_ts, id: write.id, value: write.write.document, prev_ts: write.prev_ts, }) .collect_vec(), ); let index_writes = Arc::new( index_writes .into_iter() .map(|(ts, update)| PersistenceIndexEntry::from_index_update(ts, &update)) .collect_vec(), ); loop { // Inline try_join so we don't recapture the stacktrace on error let name = "Commit::write_to_persistence"; let handle = AbortOnDropHandle::new(tokio_spawn( name, Self::write_to_persistence( persistence.clone(), index_writes.clone(), document_writes.clone(), ) .in_span(Span::enter_with_local_parent(name)), )); if let Err(mut e) = handle.await? { if e.is::<DatabaseTimeoutError>() { let delay = backoff.fail(&mut rt.rng()); tracing::error!( "Failed to write to persistence because database timed out" ); report_error(&mut e).await; rt.wait(delay).await; } else { return Err(e); } } else { pause_client.wait(AFTER_PENDING_WRITE_SNAPSHOT).await; return Ok(PersistenceWrite::Commit { pending_write, commit_timer, result, parent_trace: parent_trace_copy, commit_id, }); } } } .in_span(outer_span) .in_span(request_span) .boxed(), ) } #[fastrace::trace] fn track_commit( usage_tracker: FunctionUsageTracker, index_writes: &BTreeSet<(Timestamp, DatabaseIndexUpdate)>, document_writes: &Vec<ValidatedDocumentWrite>, table_mapping: &TableMapping, component_registry: &ComponentRegistry, ) { for (_, index_write) in index_writes { if let DatabaseIndexValue::NonClustered(doc) = index_write.value { let tablet_id = doc.tablet_id; let Ok(table_namespace) = table_mapping.tablet_namespace(tablet_id) else { continue; }; let component_id = ComponentId::from(table_namespace); let component_path = component_registry .get_component_path(component_id, &mut TransactionReadSet::new()) // It's possible that the component gets deleted in this transaction. In that case, miscount the usage as root. .unwrap_or(ComponentPath::root()); if let Ok(table_name) = table_mapping.tablet_name(tablet_id) { // Index metadata is never a vector // Database bandwidth for index writes usage_tracker.track_database_ingress_size( component_path, table_name.to_string(), index_write.key.size() as u64, // Exclude indexes on system tables or reserved system indexes on user // tables table_name.is_system() || index_write.is_system_index, ); } } } for validated_write in document_writes { let ValidatedDocumentWrite { id: document_id, write: DocumentWrite { document }, doc_in_vector_index, .. } = validated_write; if let Some(document) = document { let document_write_size = document_id.size() + document.size(); let tablet_id = document.id().tablet_id; let Ok(table_namespace) = table_mapping.tablet_namespace(tablet_id) else { continue; }; let component_id = ComponentId::from(table_namespace); let component_path = component_registry .get_component_path(component_id, &mut TransactionReadSet::new()) // It's possible that the component gets deleted in this transaction. In that case, miscount the usage as root. .unwrap_or(ComponentPath::root()); if let Ok(table_name) = table_mapping.tablet_name(tablet_id) { // Database bandwidth for document writes if *doc_in_vector_index == DocInVectorIndex::Absent { usage_tracker.track_database_ingress_size( component_path, table_name.to_string(), document_write_size as u64, table_name.is_system(), ); } else { usage_tracker.track_vector_ingress_size( component_path, table_name.to_string(), document_write_size as u64, table_name.is_system(), ); } } } } } fn next_commit_ts(&mut self) -> anyhow::Result<Timestamp> { let _timer = next_commit_ts_seconds(); let latest_ts = self.snapshot_manager.read().latest_ts(); let max = cmp::max( latest_ts.succ()?, cmp::max( self.runtime.generate_timestamp()?, self.last_assigned_ts.succ()?, ), ); self.last_assigned_ts = max; Ok(max) } fn next_max_repeatable_ts(&mut self) -> anyhow::Result<Timestamp> { if let Some(min_pending) = self.pending_writes.min_ts() { // If there's a pending write, push max_repeatable_ts to be right // before the pending write, so followers can choose recent // timestamps but can't read at the timestamp of the pending write. anyhow::ensure!(min_pending <= self.last_assigned_ts); min_pending.pred() } else { // If there are no pending writes, bump last_assigned_ts and write // to persistence and snapshot manager as a commit would. self.next_commit_ts() } } } struct ValidatedDocumentWrite { commit_ts: Timestamp, id: InternalDocumentId, write: DocumentWrite, doc_in_vector_index: DocInVectorIndex, prev_ts: Option<Timestamp>, } #[derive(Clone)] pub struct CommitterClient { handle: Arc<Mutex<Box<dyn SpawnHandle>>>, sender: mpsc::Sender<CommitterMessage>, persistence_reader: Arc<dyn PersistenceReader>, retention_validator: Arc<dyn RetentionValidator>, snapshot_reader: Reader<SnapshotManager>, } impl CommitterClient { pub async fn finish_search_and_vector_bootstrap( &self, bootstrapped_indexes: BootstrappedSearchIndexes, bootstrap_ts: RepeatableTimestamp, ) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); let message = CommitterMessage::FinishTextAndVectorBootstrap { bootstrapped_indexes, bootstrap_ts, result: tx, }; self.sender.try_send(message).map_err(|e| match e { TrySendError::Full(..) => metrics::committer_full_error().into(), TrySendError::Closed(..) => metrics::shutdown_error(), })?; // The only reason we might fail here if the committer is shutting down. rx.await.map_err(|_| metrics::shutdown_error())? } pub async fn finish_table_summary_bootstrap(&self) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); let message = CommitterMessage::FinishTableSummaryBootstrap { result: tx }; self.sender.try_send(message).map_err(|e| match e { TrySendError::Full(..) => metrics::committer_full_error().into(), TrySendError::Closed(..) => metrics::shutdown_error(), })?; // The only reason we might fail here if the committer is shutting down. rx.await.map_err(|_| metrics::shutdown_error())? } // Tell the committer to load all indexes for the given tables into memory. pub async fn load_indexes_into_memory( &self, tables: BTreeSet<TableName>, ) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); let message = CommitterMessage::LoadIndexesIntoMemory { tables, result: tx }; self.sender.try_send(message).map_err(|e| match e { TrySendError::Full(..) => metrics::committer_full_error().into(), TrySendError::Closed(..) => metrics::shutdown_error(), })?; // The only reason we might fail here if the committer is shutting down. rx.await.map_err(|_| metrics::shutdown_error())? } pub fn commit<RT: Runtime>( &self, transaction: Transaction<RT>, write_source: WriteSource, ) -> BoxFuture<'_, anyhow::Result<Timestamp>> { self._commit(transaction, write_source).boxed() } #[fastrace::trace] async fn _commit<RT: Runtime>( &self, transaction: Transaction<RT>, write_source: WriteSource, ) -> anyhow::Result<Timestamp> { let _timer = metrics::commit_client_timer(transaction.identity()); self.check_generated_ids(&transaction).await?; // Finish reading everything from persistence. let transaction = transaction.finalize(self.snapshot_reader.clone()).await?; let queue_timer = metrics::commit_queue_timer(); let (tx, rx) = oneshot::channel(); let message = CommitterMessage::Commit { queue_timer, transaction, result: tx, write_source, parent_trace: EncodedSpan::from_parent(), }; self.sender.try_send(message).map_err(|e| match e { TrySendError::Full(..) => metrics::committer_full_error().into(), TrySendError::Closed(..) => metrics::shutdown_error(), })?; let Ok(result) = rx.await else { anyhow::bail!(metrics::shutdown_error()); }; if let Err(e) = result { return Err(recapture_stacktrace(e).await); } result } pub fn shutdown(&self) { self.handle.lock().shutdown(); } #[cfg(any(test, feature = "testing"))] pub async fn bump_max_repeatable_ts(&self) -> anyhow::Result<Timestamp> { let (tx, rx) = oneshot::channel(); let message = CommitterMessage::BumpMaxRepeatableTs { result: tx }; self.sender .try_send(message) .map_err(|e| anyhow::anyhow!("{e}"))?; Ok(rx.await?) } async fn check_generated_ids<RT: Runtime>( &self, transaction: &Transaction<RT>, ) -> anyhow::Result<()> { // Check that none of the DocumentIds generated in this transaction // are already in use. // We can check at the begin_timestamp+1 because generated_ids are also // checked for conflict against all writes after begin_timestamp. let ts = transaction.begin_timestamp().succ()?; let timer = metrics::commit_id_reuse_timer(); let generated_ids = transaction.writes.as_flat()?.generated_ids(); if !generated_ids.is_empty() { let repeatable_persistence = RepeatablePersistence::new( self.persistence_reader.clone(), transaction.begin_timestamp(), self.retention_validator.clone(), ); let generated_ids_with_ts: BTreeSet<_> = generated_ids .iter() .map(|id| (InternalDocumentId::from(*id), ts)) .collect(); let mut previous_revisions_of_ids = repeatable_persistence .previous_revisions(generated_ids_with_ts) .await?; if let Some(( (document_id, _), DocumentLogEntry { value: maybe_doc, .. }, )) = previous_revisions_of_ids.pop_first() { let display_id = generated_ids .iter() .find(|id| InternalDocumentId::from(**id) == document_id) .map(|id| DeveloperDocumentId::from(*id).encode()) .unwrap_or(document_id.to_string()); if maybe_doc.is_none() { anyhow::bail!(ErrorMetadata::bad_request( "DocumentDeleted", format!( "Cannot recreate document with _id {display_id} that was deleted. Try \ to insert it without an _id or insert into another table." ), )); } else { anyhow::bail!(ErrorMetadata::bad_request( "DocumentExists", format!( "Cannot create document with _id {display_id} that already exists. \ Try to update it with `db.patch` or `db.replace`." ), )); } } } timer.finish(); Ok(()) } } enum CommitterMessage { Commit { queue_timer: Timer<VMHistogram>, transaction: FinalTransaction, result: oneshot::Sender<anyhow::Result<Timestamp>>, write_source: WriteSource, parent_trace: EncodedSpan, }, #[cfg(any(test, feature = "testing"))] BumpMaxRepeatableTs { result: oneshot::Sender<Timestamp> }, LoadIndexesIntoMemory { tables: BTreeSet<TableName>, result: oneshot::Sender<anyhow::Result<()>>, }, FinishTextAndVectorBootstrap { bootstrapped_indexes: BootstrappedSearchIndexes, bootstrap_ts: RepeatableTimestamp, result: oneshot::Sender<anyhow::Result<()>>, }, FinishTableSummaryBootstrap { result: oneshot::Sender<anyhow::Result<()>>, }, } // Within a single transaction that writes multiple documents, this is the order // in which we write them. // // Dependencies: // - _tables table created before other tables. // - table created before its indexes. // - indexes created before documents in the table. // - indexes deleted before other indexes created, in case of naming conflicts. // - tables deleted before other tables created, in case of naming conflicts. // - indexes on a table deleted before the table itself. pub fn table_dependency_sort_key( bootstrap_tables: BootstrapTableIds, id: InternalDocumentId, update: Option<&ResolvedDocument>, ) -> (usize, InternalDocumentId) { let table = id.table(); let sort_key = if table == bootstrap_tables.tables_id.tablet_id { match update { Some(insertion) => { let table_metadata: ParsedDocument<TableMetadata> = insertion.parse().unwrap_or_else(|e| { panic!("Writing invalid TableMetadata {}: {e}", insertion.value().0) }); match table_metadata.state { TableState::Active => { if &table_metadata.name == &*TABLES_TABLE { // In bootstrapping, create _tables table first. 2 } else { // Create other tables, especially the _index table, next. 3 } }, TableState::Hidden => 3, // Deleting index must come before table deletion, // so we can delete the table.by_id index while the table still exists. TableState::Deleting => 1, } }, // Legacy method of deleting _tables, supported here when walking the log. None => 1, } } else if table == bootstrap_tables.index_id.tablet_id { if update.is_none() { // Index deletes come first, in case one is being deleted and another // created with the same name. 0 } else { 4 } } else { 5 }; (sort_key, id) } struct ValidatedCommit { index_writes: BTreeSet<(Timestamp, DatabaseIndexUpdate)>, document_writes: Vec<ValidatedDocumentWrite>, pending_write: PendingWriteHandle, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server