Skip to main content
Glama

Convex MCP server

Official
by get-convex
writes.rs22.7 kB
//! Write set tracking for an active transaction use std::{ collections::BTreeSet, ops::{ Deref, DerefMut, }, }; use anyhow::Context; use common::{ bootstrap_model::index::{ database_index::IndexedFields, index_metadata_serialize_tablet_id, INDEX_BY_TABLE_ID_VIRTUAL_INDEX_DESCRIPTOR, TABLE_ID_FIELD_PATH, }, document::{ DocumentUpdateWithPrevTs, ResolvedDocument, }, index::IndexKey, interval::{ BinaryKey, Interval, }, knobs::{ TRANSACTION_MAX_NUM_USER_WRITES, TRANSACTION_MAX_SYSTEM_NUM_WRITES, TRANSACTION_MAX_SYSTEM_WRITE_SIZE_BYTES, TRANSACTION_MAX_USER_WRITE_SIZE_BYTES, }, types::{ TabletIndexName, WriteTimestamp, }, value::{ ResolvedDocumentId, Size, }, }; use errors::ErrorMetadata; use imbl::OrdMap; use value::{ values_to_bytes, DeveloperDocumentId, TabletId, }; use crate::{ bootstrap_model::defaults::BootstrapTableIds, reads::TransactionReadSet, schema_registry::SchemaRegistry, ComponentRegistry, TableRegistry, }; #[derive(Clone, PartialEq)] #[cfg_attr(any(test, feature = "testing"), derive(Debug))] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct DocumentWrite { pub document: Option<ResolvedDocument>, } pub trait PendingWrites: Clone {} impl PendingWrites for Writes {} impl PendingWrites for TableRegistry {} impl PendingWrites for SchemaRegistry {} impl PendingWrites for ComponentRegistry {} pub type NestedWriteToken = u32; #[derive(Debug, Clone, PartialEq)] pub struct NestedWrites<W: PendingWrites> { parent: Option<Box<NestedWrites<W>>>, pending: W, nested_token: NestedWriteToken, } impl<W: PendingWrites> Deref for NestedWrites<W> { type Target = W; fn deref(&self) -> &Self::Target { &self.pending } } impl<W: PendingWrites> DerefMut for NestedWrites<W> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.pending } } impl<W: PendingWrites> NestedWrites<W> { pub fn new(writes: W) -> Self { Self { parent: None, pending: writes, nested_token: 0, } } pub fn begin_nested(&mut self) -> NestedWriteToken { let new_pending = self.pending.clone(); let nested_token = self.nested_token + 1; let new_writes = NestedWrites { parent: None, pending: new_pending, nested_token, }; let parent = std::mem::replace(self, new_writes); self.parent = Some(Box::new(parent)); nested_token } pub fn commit_nested(&mut self, token: NestedWriteToken) -> anyhow::Result<()> { anyhow::ensure!( self.nested_token == token, "Mismatched nested transaction token {} != {}", self.nested_token, token ); let parent = self .parent .take() .context("No nested transaction to commit")?; let pending = std::mem::replace(self, *parent).pending; self.pending = pending; Ok(()) } pub fn rollback_nested(&mut self, token: NestedWriteToken) -> anyhow::Result<()> { anyhow::ensure!( self.nested_token == token, "Mismatched nested transaction token {} != {}", self.nested_token, token ); let parent = self .parent .take() .context("No nested transaction to rollback")?; *self = *parent; Ok(()) } pub fn require_not_nested(&self) -> anyhow::Result<()> { anyhow::ensure!(self.parent.is_none(), "Nested transaction in progress"); Ok(()) } pub fn as_flat(&self) -> anyhow::Result<&W> { self.require_not_nested()?; Ok(&self.pending) } pub fn into_flat(self) -> anyhow::Result<W> { self.require_not_nested()?; Ok(self.pending) } pub fn pending(&mut self) -> &mut W { &mut self.pending } } /// The write set for a transaction, maintained by `TransactionState` #[derive(Debug, Clone, PartialEq)] pub struct Writes { updates: OrdMap<ResolvedDocumentId, DocumentUpdateWithPrevTs>, // Fields below can be recomputed from `updates`. // Size of writes to user tables user_tx_size: TransactionWriteSize, // Size of writes to system tables system_tx_size: TransactionWriteSize, } #[derive(Clone, Debug, Default, PartialEq)] pub struct TransactionWriteSize { // Total number of writes (i.e. calls to `mutate`) pub num_writes: usize, // Total size of mutations. Writing to the same DocumentId twice will still count twice. pub size: usize, } impl Writes { /// Create an empty write set. pub fn new() -> Self { Self { updates: OrdMap::new(), user_tx_size: TransactionWriteSize::default(), system_tx_size: TransactionWriteSize::default(), } } /// Are there any writes in the active transaction? pub fn is_empty(&self) -> bool { self.updates.is_empty() } pub fn update( &mut self, bootstrap_tables: BootstrapTableIds, is_system_document: bool, reads: &mut TransactionReadSet, document_id: ResolvedDocumentId, old_document: Option<(ResolvedDocument, WriteTimestamp)>, new_document: Option<ResolvedDocument>, ) -> anyhow::Result<()> { if old_document.is_none() { anyhow::ensure!(!self.updates.contains_key(&document_id), "Duplicate insert"); self.register_new_id(reads, document_id)?; } Self::record_reads_for_write(bootstrap_tables, reads, document_id.tablet_id)?; let id_size = document_id.size(); let value_size = new_document.as_ref().map(|d| d.value().size()).unwrap_or(0); let tx_size = if is_system_document { &mut self.system_tx_size } else { &mut self.user_tx_size }; // We always increment the size first, even if we throw, // we want the size to reflect the write, so that // we can tell that we threw and not issue a warning. tx_size.num_writes += 1; tx_size.size += id_size + value_size; if is_system_document { let tx_size = &self.system_tx_size; // If we exceed system limits, throw a system error and not a developer one. // Developers have no control over system tables. We should define feature // specific limit to avoid hitting the system table ones if needed. anyhow::ensure!( tx_size.num_writes <= *TRANSACTION_MAX_SYSTEM_NUM_WRITES, "Too many system document writes in a single transaction: {}", tx_size.num_writes ); anyhow::ensure!( tx_size.size <= *TRANSACTION_MAX_SYSTEM_WRITE_SIZE_BYTES, "Too many bytes written in system tables in a single transaction: {}", tx_size.size ); tx_size } else { let tx_size = &self.user_tx_size; anyhow::ensure!( tx_size.num_writes <= *TRANSACTION_MAX_NUM_USER_WRITES, ErrorMetadata::pagination_limit( "TooManyWrites", format!( "Too many writes in a single function execution (limit: {})", *TRANSACTION_MAX_NUM_USER_WRITES, ) ), ); anyhow::ensure!( tx_size.size <= *TRANSACTION_MAX_USER_WRITE_SIZE_BYTES, ErrorMetadata::pagination_limit( "TooManyBytesWritten", format!( "Too many bytes written in a single function execution (limit: {} bytes)", *TRANSACTION_MAX_USER_WRITE_SIZE_BYTES, ) ), ); tx_size }; if let Some(old_update) = self.updates.get_mut(&document_id) { let (old_document, old_document_ts) = old_document.unzip(); anyhow::ensure!( old_update.new_document == old_document, "Inconsistent update: The old update's new document does not match the new \ document's old update" ); anyhow::ensure!( [None, Some(WriteTimestamp::Pending)].contains(&old_document_ts), "Inconsistent update: The new document's old update timestamp should be Pending \ but is {:?}", old_document_ts ); old_update.new_document = new_document; } else { self.updates.insert( document_id, DocumentUpdateWithPrevTs { id: document_id, old_document: match old_document { Some((d, ts)) => Some(( d, match ts { WriteTimestamp::Committed(ts) => ts, WriteTimestamp::Pending => anyhow::bail!( "Old document timestamp is Pending, but there is no pending \ write" ), }, )), None => None, }, new_document, }, ); } Ok(()) } fn record_reads_for_write( table_mapping: BootstrapTableIds, reads: &mut TransactionReadSet, tablet_id: TabletId, ) -> anyhow::Result<()> { // by_name index on _indexes table. if table_mapping.is_index_table(tablet_id) || table_mapping.is_tables_table(tablet_id) { // Changes in _tables or _index cannot race with any other table or // index. This is because TableRegistry and IndexRegistry check a // number of invariants between tables and index records. // TODO(presley): This is probably the wrong layer to add this dependency. // We should be added by TableRegistry and IndexRegistry themselves. // For example, fast forwarding a vector search checkpoint does not // need this dependency. reads.record_indexed_derived( TabletIndexName::by_id(table_mapping.tables_id.tablet_id), IndexedFields::by_id(), Interval::all(), ); reads.record_indexed_derived( TabletIndexName::by_id(table_mapping.index_id.tablet_id), IndexedFields::by_id(), Interval::all(), ); } else { // Writes to a table require the table still exists. let table_id_bytes = IndexKey::new( vec![], DeveloperDocumentId::new(table_mapping.tables_id.table_number, tablet_id.0), ) .to_bytes(); reads.record_indexed_derived( TabletIndexName::by_id(table_mapping.tables_id.tablet_id), IndexedFields::by_id(), Interval::prefix(table_id_bytes.into()), ); // Inserts or updates also need all of the indexes they touch to // be stable. Thus we take read dependency on all indexes for that table_id. // TODO(presley): The _index.by_table_id index does not really exist. // Pretend it does since evaluating read dependencies do not actually // need to read the index. We only care about the name always mapping // to the same fields. let tablet_id_bytes = values_to_bytes(&[Some(index_metadata_serialize_tablet_id(&tablet_id)?)]); reads.record_indexed_derived( TabletIndexName::new( table_mapping.index_id.tablet_id, INDEX_BY_TABLE_ID_VIRTUAL_INDEX_DESCRIPTOR.clone(), )?, vec![TABLE_ID_FIELD_PATH.clone()].try_into()?, // Note that should really be exact point instead of a prefix, // but our read set interval does not support this. Interval::prefix(BinaryKey::from(tablet_id_bytes)), ); }; Ok(()) } /// Register a newly allocated DocumentId. /// This enables us to check for reuse on commit. pub(crate) fn register_new_id( &mut self, reads: &mut TransactionReadSet, document_id: ResolvedDocumentId, ) -> anyhow::Result<()> { // New ID creation requires the ID to have never existed before. // We check in CommitterClient that it never existed before the transaction's // begin timestamp, and here we take a dependency on the ID to make sure // it cannot be created by a parallel commit. let index_name = TabletIndexName::by_id(document_id.tablet_id); let id_bytes = IndexKey::new(vec![], document_id.into()).to_bytes(); reads.record_indexed_derived( index_name, IndexedFields::by_id(), Interval::prefix(id_bytes.into()), ); Ok(()) } /// How large is the given write transaction? pub fn user_size(&self) -> &TransactionWriteSize { &self.user_tx_size } pub fn system_size(&self) -> &TransactionWriteSize { &self.system_tx_size } /// Iterate over the coalesced writes (so no `DocumentId` appears twice). pub fn coalesced_writes( &self, ) -> impl Iterator<Item = (&ResolvedDocumentId, &DocumentUpdateWithPrevTs)> { self.updates.iter() } pub fn into_coalesced_writes( self, ) -> impl Iterator<Item = (ResolvedDocumentId, DocumentUpdateWithPrevTs)> { self.updates.into_iter() } pub fn into_updates(self) -> OrdMap<ResolvedDocumentId, DocumentUpdateWithPrevTs> { self.updates } pub fn generated_ids(&self) -> BTreeSet<ResolvedDocumentId> { self.updates .iter() .filter(|(_, update)| update.old_document.is_none()) .map(|(id, _)| *id) .collect() } } #[cfg(test)] mod tests { use common::{ bootstrap_model::{ index::{ database_index::IndexedFields, IndexMetadata, INDEX_TABLE, }, tables::TableMetadata, }, document::{ CreationTime, DocumentUpdateWithPrevTs, PackedDocument, ResolvedDocument, }, testing::TestIdGenerator, types::{ IndexDescriptor, PersistenceVersion, TabletIndexName, WriteTimestamp, }, }; use maplit::btreeset; use sync_types::Timestamp; use value::{ assert_obj, TableNamespace, }; use super::Writes; use crate::{ bootstrap_model::defaults::BootstrapTableIds, reads::TransactionReadSet, }; #[test] fn test_write_read_dependencies() -> anyhow::Result<()> { // Create table mapping. let mut id_generator = TestIdGenerator::new(); let user_table1 = id_generator.user_table_id(&"user_table1".parse()?); let user_table2 = id_generator.user_table_id(&"user_table2".parse()?); let bootstrap_tables = BootstrapTableIds::new(&id_generator); // Writes to a table should OCC with modification of the table metadata // or an index of the same table. let mut user_table1_write = TransactionReadSet::new(); Writes::record_reads_for_write( bootstrap_tables, &mut user_table1_write, user_table1.tablet_id, )?; let user_table1_table_metadata_change = PackedDocument::pack(&ResolvedDocument::new( bootstrap_tables.table_resolved_doc_id(user_table1.tablet_id), CreationTime::ONE, TableMetadata::new( TableNamespace::test_user(), "big_table".parse()?, user_table1.table_number, ) .try_into()?, )?); assert!(user_table1_write .read_set() .overlaps_document_for_test( &user_table1_table_metadata_change, PersistenceVersion::default() ) .is_some()); let user_table1_index_change = PackedDocument::pack(&ResolvedDocument::new( id_generator.system_generate(&INDEX_TABLE), CreationTime::ONE, IndexMetadata::new_backfilling( Timestamp::MIN, TabletIndexName::new(user_table1.tablet_id, IndexDescriptor::new("by_likes")?)?, IndexedFields::by_id(), ) .try_into()?, )?); assert!(user_table1_write .read_set() .overlaps_document_for_test(&user_table1_index_change, PersistenceVersion::default()) .is_some()); // Writes to a table should *not* OCC with modification of the table metadata // or an index of unrelated same table. let user_table2_table_metadata_change = PackedDocument::pack(&ResolvedDocument::new( bootstrap_tables.table_resolved_doc_id(user_table2.tablet_id), CreationTime::ONE, TableMetadata::new( TableNamespace::test_user(), "small_table".parse()?, user_table2.table_number, ) .try_into()?, )?); assert!(user_table1_write .read_set() .overlaps_document_for_test( &user_table2_table_metadata_change, PersistenceVersion::default() ) .is_none()); let user_table2_index_change = PackedDocument::pack(&ResolvedDocument::new( id_generator.system_generate(&INDEX_TABLE), CreationTime::ONE, IndexMetadata::new_backfilling( Timestamp::MIN, TabletIndexName::new(user_table2.tablet_id, IndexDescriptor::new("by_likes")?)?, IndexedFields::by_id(), ) .try_into()?, )?); assert!(user_table1_write .read_set() .overlaps_document_for_test(&user_table2_index_change, PersistenceVersion::default()) .is_none()); // Changes to any index metadata should conflict with changes to any // other table or index metadata. let mut metadata_write = TransactionReadSet::new(); let index_table_id = bootstrap_tables.index_id; Writes::record_reads_for_write( bootstrap_tables, &mut metadata_write, index_table_id.tablet_id, )?; assert!(metadata_write .read_set() .overlaps_document_for_test( &user_table1_table_metadata_change, PersistenceVersion::default() ) .is_some()); assert!(metadata_write .read_set() .overlaps_document_for_test(&user_table1_index_change, PersistenceVersion::default()) .is_some()); assert!(metadata_write .read_set() .overlaps_document_for_test( &user_table2_table_metadata_change, PersistenceVersion::default() ) .is_some()); assert!(metadata_write .read_set() .overlaps_document_for_test(&user_table2_index_change, PersistenceVersion::default()) .is_some()); Ok(()) } #[test] fn test_register_new_id() -> anyhow::Result<()> { let mut id_generator = TestIdGenerator::new(); let table_name = "table".parse()?; let _ = id_generator.user_table_id(&table_name); let bootstrap_tables = BootstrapTableIds::new(&id_generator); let mut writes = Writes::new(); let mut reads = TransactionReadSet::new(); let id = id_generator.user_generate(&table_name); let document = ResolvedDocument::new(id, CreationTime::ONE, assert_obj!("hello" => "world"))?; writes.update( bootstrap_tables, false, &mut reads, id, None, Some(document), )?; assert_eq!(writes.generated_ids(), btreeset! {id}); Ok(()) } #[test] fn test_document_updates_are_combined() -> anyhow::Result<()> { let mut id_generator = TestIdGenerator::new(); let table_name = "table".parse()?; let _ = id_generator.user_table_id(&table_name); let bootstrap_tables = BootstrapTableIds::new(&id_generator); let mut writes = Writes::new(); let mut reads = TransactionReadSet::new(); let id = id_generator.user_generate(&table_name); let old_document = ResolvedDocument::new(id, CreationTime::ONE, assert_obj!())?; let new_document = ResolvedDocument::new(id, CreationTime::ONE, assert_obj!("hello" => "world"))?; writes.update( bootstrap_tables, false, &mut reads, id, Some(( old_document.clone(), WriteTimestamp::Committed(Timestamp::must(123)), )), Some(new_document.clone()), )?; let newer_document = ResolvedDocument::new( id, CreationTime::ONE, assert_obj!("hello" => "world", "foo" => "bar"), )?; writes.update( bootstrap_tables, false, &mut reads, id, Some((new_document, WriteTimestamp::Pending)), Some(newer_document.clone()), )?; assert_eq!(writes.updates.len(), 1); assert_eq!( writes.updates.get_min().unwrap(), &( id, DocumentUpdateWithPrevTs { id, old_document: Some((old_document, Timestamp::must(123))), new_document: Some(newer_document), } ) ); assert_eq!(writes.generated_ids(), btreeset! {}); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server