Skip to main content
Glama

Convex MCP server

Official
by get-convex
write_log.rs26.4 kB
use std::{ borrow::Cow, collections::{ BTreeMap, VecDeque, }, sync::Arc, }; use common::{ document::{ DocumentUpdate, DocumentUpdateRef, PackedDocument, }, document_index_keys::DocumentIndexKeys, knobs::{ WRITE_LOG_MAX_RETENTION_SECS, WRITE_LOG_MIN_RETENTION_SECS, WRITE_LOG_SOFT_MAX_SIZE_BYTES, }, runtime::block_in_place, types::{ PersistenceVersion, Timestamp, }, value::ResolvedDocumentId, }; use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; use futures::Future; use imbl::Vector; use indexing::index_registry::IndexRegistry; use parking_lot::Mutex; use search::query::tokenize; use tokio::sync::oneshot; use value::heap_size::{ HeapSize, WithHeapSize, }; use crate::{ database::ConflictingReadWithWriteSource, metrics, reads::ReadSet, Snapshot, Token, }; #[derive(Clone)] pub struct PackedDocumentUpdate { pub id: ResolvedDocumentId, pub old_document: Option<PackedDocument>, pub new_document: Option<PackedDocument>, } impl HeapSize for PackedDocumentUpdate { fn heap_size(&self) -> usize { self.old_document.heap_size() + self.new_document.heap_size() } } type OrderedDocumentWrites = WithHeapSize<Vec<(ResolvedDocumentId, PackedDocumentUpdate)>>; impl PackedDocumentUpdate { pub fn pack(update: &impl DocumentUpdateRef) -> Self { Self { id: update.id(), old_document: update.old_document().map(PackedDocument::pack), new_document: update.new_document().map(PackedDocument::pack), } } pub fn unpack(&self) -> DocumentUpdate { DocumentUpdate { id: self.id, old_document: self.old_document.as_ref().map(|doc| doc.unpack()), new_document: self.new_document.as_ref().map(|doc| doc.unpack()), } } } pub type IterWrites<'a> = std::slice::Iter<'a, (ResolvedDocumentId, DocumentIndexKeysUpdate)>; #[derive(Clone)] pub struct DocumentIndexKeysUpdate { pub id: ResolvedDocumentId, pub old_document_keys: Option<DocumentIndexKeys>, pub new_document_keys: Option<DocumentIndexKeys>, } impl DocumentIndexKeysUpdate { pub fn from_document_update( full: PackedDocumentUpdate, index_registry: &IndexRegistry, ) -> Self { Self { id: full.id, old_document_keys: full .old_document .map(|old_doc| index_registry.document_index_keys(old_doc, tokenize)), new_document_keys: full .new_document .map(|new_doc| index_registry.document_index_keys(new_doc, tokenize)), } } } impl HeapSize for DocumentIndexKeysUpdate { fn heap_size(&self) -> usize { self.old_document_keys.heap_size() + self.new_document_keys.heap_size() } } type OrderedIndexKeyWrites = WithHeapSize<Vec<(ResolvedDocumentId, DocumentIndexKeysUpdate)>>; /// Converts [OrderedDocumentWrites] (the log used in `PendingWrites` that /// contains full documents) to [OrderedIndexKeyWrites] (the log used /// in `WriteLog` that contains only index keys). pub fn index_keys_from_full_documents( ordered_writes: OrderedDocumentWrites, index_registry: &IndexRegistry, ) -> OrderedIndexKeyWrites { let elements: Vec<_> = ordered_writes .into_iter() .map(|(id, update)| { ( id, DocumentIndexKeysUpdate::from_document_update(update, index_registry), ) }) .collect(); WithHeapSize::from(elements) } #[derive(Clone, Debug, PartialEq, Eq)] pub struct WriteSource(pub(crate) Option<Cow<'static, str>>); impl WriteSource { pub fn unknown() -> Self { Self(None) } pub fn new(source: impl Into<Cow<'static, str>>) -> Self { Self(Some(source.into())) } } impl From<Option<String>> for WriteSource { fn from(value: Option<String>) -> Self { Self(value.map(|value| value.into())) } } impl From<String> for WriteSource { fn from(value: String) -> Self { Self(Some(value.into())) } } impl From<&'static str> for WriteSource { fn from(value: &'static str) -> Self { Self(Some(value.into())) } } impl HeapSize for WriteSource { fn heap_size(&self) -> usize { self.0 .as_ref() .filter(|value| value.is_owned()) .map(|value| value.len()) .unwrap_or_default() } } struct WriteLogManager { log: WriteLog, waiters: VecDeque<(Timestamp, oneshot::Sender<()>)>, } impl WriteLogManager { fn new(initial_timestamp: Timestamp) -> Self { let log = WriteLog::new(initial_timestamp); let waiters = VecDeque::new(); Self { log, waiters } } fn notify_waiters(&mut self) { let ts = self.log.max_ts(); // Notify waiters let mut i = 0; while i < self.waiters.len() { if ts > self.waiters[i].0 || self.waiters[i].1.is_closed() { // Remove from the waiters. let w = self.waiters.swap_remove_back(i).expect("checked above"); // Notify. Ignore if receiver has dropped. let _ = w.1.send(()); // Continue without increasing i, since we just swapped the // element and that position and need to check it too. continue; } i += 1; } } fn append(&mut self, ts: Timestamp, writes: OrderedIndexKeyWrites, write_source: WriteSource) { assert!(self.log.max_ts() < ts, "{:?} >= {}", self.log.max_ts(), ts); self.log .by_ts .push_back(Arc::new((ts, writes, write_source))); self.notify_waiters(); } /// Returns a future that blocks until the log has advanced past the given /// timestamp. fn wait_for_higher_ts(&mut self, target_ts: Timestamp) -> impl Future<Output = ()> { // Clean up waiters that are canceled. self.notify_waiters(); let receiver = if self.log.max_ts() <= target_ts { let (sender, receiver) = oneshot::channel(); self.waiters.push_back((target_ts, sender)); Some(receiver) } else { None }; async move { if let Some(receiver) = receiver { _ = receiver.await; } } } fn enforce_retention_policy(&mut self, current_ts: Timestamp) { let max_ts = current_ts .sub(*WRITE_LOG_MIN_RETENTION_SECS) .unwrap_or(Timestamp::MIN); let target_ts = current_ts .sub(*WRITE_LOG_MAX_RETENTION_SECS) .unwrap_or(Timestamp::MIN); while let Some((ts, ..)) = self.log.by_ts.front().map(|entry| &**entry) { let ts = *ts; // We never trim past max_ts, even if the size of the write log // is larger. if ts >= max_ts { break; } // Trim the log based on both target_ts and size. if ts >= target_ts && self.log.by_ts.heap_size() < *WRITE_LOG_SOFT_MAX_SIZE_BYTES { break; } self.log.purged_ts = ts; self.log.by_ts.pop_front(); } } } /// WriteLog holds recent commits that have been written to persistence and /// snapshot manager. These commits may cause OCC aborts for new commits, and /// they may trigger subscriptions. #[derive(Clone)] struct WriteLog { by_ts: WithHeapSize<Vector<Arc<(Timestamp, OrderedIndexKeyWrites, WriteSource)>>>, purged_ts: Timestamp, } impl WriteLog { fn new(initial_timestamp: Timestamp) -> Self { Self { by_ts: WithHeapSize::default(), purged_ts: initial_timestamp, } } fn max_ts(&self) -> Timestamp { match self.by_ts.back() { Some(entry) => entry.0, None => self.purged_ts, } } // Runtime: O((log n) + k) where n is total length of the write log and k is // the number of elements in the returned iterator. fn iter( &self, from: Timestamp, to: Timestamp, ) -> anyhow::Result<impl Iterator<Item = (&Timestamp, IterWrites<'_>, &WriteSource)> + '_> { anyhow::ensure!( from > self.purged_ts, anyhow::anyhow!( "Timestamp {from} is outside of write log retention window (minimum timestamp {})", self.purged_ts ) .context(ErrorMetadata::out_of_retention()) ); let start = match self.by_ts.binary_search_by_key(&from, |entry| entry.0) { Ok(i) => i, Err(i) => i, }; let iter = self.by_ts.focus().narrow(start..).into_iter(); Ok(iter .map(|entry| &**entry) .take_while(move |(t, ..)| *t <= to) .map(|(ts, writes, write_source)| (ts, writes.iter(), write_source))) } #[fastrace::trace] fn is_stale( &self, reads: &ReadSet, reads_ts: Timestamp, ts: Timestamp, ) -> anyhow::Result<Option<ConflictingReadWithWriteSource>> { block_in_place(|| { let log_range = self.iter(reads_ts.succ()?, ts)?; Ok(reads.writes_overlap_index_keys(log_range)) }) } /// Returns Err(write_ts) if the token could not be refreshed, where /// write_ts is the timestamp of a conflicting write (if known) fn refresh_token( &self, mut token: Token, ts: Timestamp, ) -> anyhow::Result<Result<Token, Option<Timestamp>>> { metrics::log_read_set_age(ts.secs_since_f64(token.ts()).max(0.0)); let result = match self.is_stale(token.reads(), token.ts(), ts) { Ok(Some(conflict)) => Err(Some(conflict.write_ts)), Err(e) if e.is_out_of_retention() => { metrics::log_reads_refresh_miss(); Err(None) }, Err(e) => return Err(e), Ok(None) => { if token.ts() < ts { token.advance_ts(ts); } Ok(token) }, }; Ok(result) } } pub fn new_write_log(initial_timestamp: Timestamp) -> (LogOwner, LogReader, LogWriter) { let log_manager = Arc::new(Mutex::new(WriteLogManager::new(initial_timestamp))); ( LogOwner { inner: log_manager.clone(), }, LogReader { inner: log_manager.clone(), }, LogWriter { inner: log_manager }, ) } /// LogOwner consumes the log and is responsible for trimming it. pub struct LogOwner { inner: Arc<Mutex<WriteLogManager>>, } impl LogOwner { pub fn enforce_retention_policy(&mut self, current_ts: Timestamp) { self.inner.lock().enforce_retention_policy(current_ts) } pub fn reader(&self) -> LogReader { LogReader { inner: self.inner.clone(), } } pub fn max_ts(&self) -> Timestamp { let snapshot = { self.inner.lock().log.clone() }; snapshot.max_ts() } pub fn refresh_token( &self, token: Token, ts: Timestamp, ) -> anyhow::Result<Result<Token, Option<Timestamp>>> { let snapshot = { self.inner.lock().log.clone() }; block_in_place(|| snapshot.refresh_token(token, ts)) } /// Blocks until the log has advanced past the given timestamp. pub async fn wait_for_higher_ts(&mut self, target_ts: Timestamp) -> Timestamp { let fut = block_in_place(|| self.inner.lock().wait_for_higher_ts(target_ts)); fut.await; let result = block_in_place(|| self.inner.lock().log.max_ts()); assert!(result > target_ts); result } pub fn for_each<F>(&self, from: Timestamp, to: Timestamp, mut f: F) -> anyhow::Result<()> where for<'a> F: FnMut(Timestamp, IterWrites<'a>), { let snapshot = { self.inner.lock().log.clone() }; block_in_place(|| { for (ts, writes, _) in snapshot.iter(from, to)? { f(*ts, writes); } Ok(()) }) } } #[derive(Clone)] pub struct LogReader { inner: Arc<Mutex<WriteLogManager>>, } impl LogReader { #[fastrace::trace] pub fn refresh_token( &self, token: Token, ts: Timestamp, ) -> anyhow::Result<Result<Token, Option<Timestamp>>> { if token.ts() == ts { // Nothing to do. We can return Ok even if `token.ts()` has fallen // out of the write log retention window. return Ok(Ok(token)); } let snapshot = { self.inner.lock().log.clone() }; block_in_place(|| { let max_ts = snapshot.max_ts(); anyhow::ensure!( ts <= max_ts, "Can't refresh token to newer timestamp {ts} than max ts {max_ts}" ); snapshot.refresh_token(token, ts) }) } pub fn refresh_reads_until_max_ts( &self, token: Token, ) -> anyhow::Result<Result<Token, Option<Timestamp>>> { let snapshot = { self.inner.lock().log.clone() }; block_in_place(|| { let max_ts = snapshot.max_ts(); snapshot.refresh_token(token, max_ts) }) } } /// LogWriter can append to the log. pub struct LogWriter { inner: Arc<Mutex<WriteLogManager>>, } impl LogWriter { // N.B.: `writes` is `OrderedWrites` because that's what the committer // already has, but the write log doesn't actually care about the ordering. pub fn append( &mut self, ts: Timestamp, writes: OrderedIndexKeyWrites, write_source: WriteSource, ) { block_in_place(|| self.inner.lock().append(ts, writes, write_source)); } pub fn is_stale( &self, reads: &ReadSet, reads_ts: Timestamp, ts: Timestamp, ) -> anyhow::Result<Option<ConflictingReadWithWriteSource>> { let snapshot = { self.inner.lock().log.clone() }; block_in_place(|| snapshot.is_stale(reads, reads_ts, ts)) } } /// Pending writes are used by the committer to detect conflicts between a new /// commit and a commit that has started but has not finished writing to /// persistence and snapshot_manager. /// These pending writes do not conflict with each other so any subset of them /// may be written to persistence, in any order. pub struct PendingWrites { by_ts: BTreeMap<Timestamp, (OrderedDocumentWrites, WriteSource, Snapshot)>, persistence_version: PersistenceVersion, } impl PendingWrites { pub fn new(persistence_version: PersistenceVersion) -> Self { Self { by_ts: BTreeMap::new(), persistence_version, } } pub fn push_back( &mut self, ts: Timestamp, writes: OrderedDocumentWrites, write_source: WriteSource, snapshot: Snapshot, ) -> PendingWriteHandle { if let Some((last_ts, _)) = self.by_ts.iter().next_back() { assert!(*last_ts < ts, "{:?} >= {}", *last_ts, ts); } self.by_ts.insert(ts, (writes, write_source, snapshot)); PendingWriteHandle(Some(ts)) } pub fn latest_snapshot(&self) -> Option<Snapshot> { self.by_ts .iter() .next_back() .map(|(_, (_, _, snapshot))| snapshot.clone()) } /// Recomputes the snapshot associated with each pending write, rebasing the /// pending writes on the new base snapshot provided. pub fn recompute_pending_snapshots(&mut self, mut base_snapshot: Snapshot) { for (ts, (ordered_writes, _, snapshot)) in self.by_ts.iter_mut() { for (_id, document_update) in ordered_writes.iter() { base_snapshot .update(&document_update.unpack(), *ts) .expect("Failed to update snapshot"); } *snapshot = base_snapshot.clone(); } } pub fn iter( &self, from: Timestamp, to: Timestamp, ) -> impl Iterator< Item = ( &Timestamp, impl Iterator<Item = &(ResolvedDocumentId, PackedDocumentUpdate)>, &WriteSource, ), > { self.by_ts .range(from..=to) .map(|(ts, (w, source, _snapshot))| (ts, w.iter(), source)) } pub fn is_stale( &self, reads: &ReadSet, reads_ts: Timestamp, ts: Timestamp, ) -> anyhow::Result<Option<ConflictingReadWithWriteSource>> { Ok(reads.writes_overlap_docs(self.iter(reads_ts.succ()?, ts), self.persistence_version)) } pub fn pop_first( &mut self, mut handle: PendingWriteHandle, ) -> Option<(Timestamp, OrderedDocumentWrites, WriteSource, Snapshot)> { let first = self.by_ts.pop_first(); if let Some((ts, (writes, write_source, snapshot))) = first { if let Some(expected_ts) = handle.0 { if ts == expected_ts { handle.0.take(); } } Some((ts, writes, write_source, snapshot)) } else { None } } pub fn min_ts(&self) -> Option<Timestamp> { self.by_ts.first_key_value().map(|(ts, _)| *ts) } } pub struct PendingWriteHandle(Option<Timestamp>); impl PendingWriteHandle { pub fn must_commit_ts(&self) -> Timestamp { self.0.expect("pending write already committed") } } #[cfg(test)] mod tests { use common::{ self, document_index_keys::DocumentIndexKeys, index::IndexKey, interval::{ BinaryKey, End, Interval, StartIncluded, }, knobs::WRITE_LOG_MAX_RETENTION_SECS, testing::TestIdGenerator, types::{ IndexDescriptor, TabletIndexName, Timestamp, }, value::FieldPath, }; use convex_macro::test_runtime; use runtime::testing::TestRuntime; use value::val; use crate::{ reads::{ ReadSet, TransactionReadSet, }, write_log::{ DocumentIndexKeysUpdate, WriteLogManager, WriteSource, }, }; #[test] fn test_write_log() -> anyhow::Result<()> { let mut log_manager = WriteLogManager::new(Timestamp::must(1000)); assert_eq!(log_manager.log.purged_ts, Timestamp::must(1000)); assert_eq!(log_manager.log.max_ts(), Timestamp::must(1000)); for ts in (1002..=1010).step_by(2) { log_manager.append(Timestamp::must(ts), vec![].into(), WriteSource::unknown()); assert_eq!(log_manager.log.purged_ts, Timestamp::must(1000)); assert_eq!(log_manager.log.max_ts(), Timestamp::must(ts)); } assert!(log_manager .log .iter(Timestamp::must(1000), Timestamp::must(1010)) .is_err()); assert_eq!( log_manager .log .iter(Timestamp::must(1001), Timestamp::must(1010))? .map(|(ts, ..)| *ts) .collect::<Vec<_>>(), (1002..=1010) .step_by(2) .map(Timestamp::must) .collect::<Vec<_>>() ); assert_eq!( log_manager .log .iter(Timestamp::must(1004), Timestamp::must(1008))? .map(|(ts, ..)| *ts) .collect::<Vec<_>>(), (1004..=1008) .step_by(2) .map(Timestamp::must) .collect::<Vec<_>>() ); assert_eq!( log_manager .log .iter(Timestamp::must(1004), Timestamp::must(1020))? .map(|(ts, ..)| *ts) .collect::<Vec<_>>(), (1004..=1010) .step_by(2) .map(Timestamp::must) .collect::<Vec<_>>() ); log_manager.enforce_retention_policy( Timestamp::must(1005) .add(*WRITE_LOG_MAX_RETENTION_SECS) .unwrap(), ); assert_eq!(log_manager.log.purged_ts, Timestamp::must(1004)); assert_eq!(log_manager.log.max_ts(), Timestamp::must(1010)); assert!(log_manager .log .iter(Timestamp::must(1004), Timestamp::must(1010)) .is_err()); assert_eq!( log_manager .log .iter(Timestamp::must(1005), Timestamp::must(1010))? .map(|(ts, ..)| *ts) .collect::<Vec<_>>(), (1006..=1010) .step_by(2) .map(Timestamp::must) .collect::<Vec<_>>() ); Ok(()) } #[test_runtime] async fn test_is_stale(_rt: TestRuntime) -> anyhow::Result<()> { let mut id_generator = TestIdGenerator::new(); let mut log_manager = WriteLogManager::new(Timestamp::must(1000)); let table_id = id_generator.user_table_id(&"t".parse()?).tablet_id; let id = id_generator.user_generate(&"t".parse()?); let index_key = IndexKey::new(vec![val!(5)], id.into()); let index_key_binary: BinaryKey = index_key.to_bytes().into(); let index_name = TabletIndexName::new(table_id, IndexDescriptor::new("by_k").unwrap()).unwrap(); log_manager.append( Timestamp::must(1003), vec![( id, DocumentIndexKeysUpdate { id, old_document_keys: None, new_document_keys: Some(DocumentIndexKeys::with_standard_index_for_test( index_name.clone(), index_key.clone(), )), }, )] .into(), WriteSource::unknown(), ); let read_set = |interval: Interval| -> ReadSet { let field_path: FieldPath = "k".parse().unwrap(); let mut reads = TransactionReadSet::new(); reads .record_indexed_directly( index_name.clone(), vec![field_path].try_into().unwrap(), interval, ) .unwrap(); reads.into_read_set() }; // Write conflicts with read. let read_set_conflict = read_set(Interval::all()); assert_eq!( log_manager .log .is_stale( &read_set_conflict, Timestamp::must(1001), Timestamp::must(1004) )? .unwrap() .read .index, index_name.clone() ); // Write happened after read finished. assert_eq!( log_manager.log.is_stale( &read_set_conflict, Timestamp::must(1001), Timestamp::must(1002) )?, None ); // Write happened before read started. assert_eq!( log_manager.log.is_stale( &read_set_conflict, Timestamp::must(1003), Timestamp::must(1004) )?, None ); // Different intervals, some of which intersect the write. let empty_read_set = read_set(Interval::empty()); assert_eq!( log_manager.log.is_stale( &empty_read_set, Timestamp::must(1001), Timestamp::must(1004) )?, None ); let prefix_read_set = read_set(Interval::prefix(index_key_binary.clone())); assert_eq!( log_manager .log .is_stale( &prefix_read_set, Timestamp::must(1001), Timestamp::must(1004) )? .unwrap() .read .index, index_name.clone() ); let end_excluded_read_set = read_set(Interval { start: StartIncluded(BinaryKey::min()), end: End::Excluded(index_key_binary.clone()), }); assert_eq!( log_manager.log.is_stale( &end_excluded_read_set, Timestamp::must(1001), Timestamp::must(1004) )?, None ); let start_included_read_set = read_set(Interval { start: StartIncluded(index_key_binary), end: End::Unbounded, }); assert_eq!( log_manager .log .is_stale( &start_included_read_set, Timestamp::must(1001), Timestamp::must(1004) )? .unwrap() .read .index, index_name.clone() ); let mut delete_log_manager = WriteLogManager::new(Timestamp::must(1000)); delete_log_manager.append( Timestamp::must(1003), vec![( id, DocumentIndexKeysUpdate { id, old_document_keys: Some(DocumentIndexKeys::with_standard_index_for_test( index_name.clone(), index_key, )), new_document_keys: None, }, )] .into(), WriteSource::unknown(), ); assert_eq!( delete_log_manager .log .is_stale( &read_set_conflict, Timestamp::must(1001), Timestamp::must(1004) )? .unwrap() .read .index, index_name ); assert_eq!( delete_log_manager.log.is_stale( &empty_read_set, Timestamp::must(1001), Timestamp::must(1004) )?, None ); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server