Skip to main content
Glama

Convex MCP server

Official
by get-convex
lib.rs55 kB
#![feature(coroutines)] #![feature(proc_macro_hygiene)] #![feature(stmt_expr_attributes)] #![feature(type_alias_impl_trait)] #![feature(let_chains)] #![feature(impl_trait_in_assoc_type)] #![feature(try_blocks)] mod chunks; mod connection; mod metrics; mod sql; #[cfg(test)] mod tests; use std::{ cmp, collections::{ BTreeMap, BTreeSet, HashMap, }, iter, ops::{ Bound, Deref, }, sync::{ atomic::{ AtomicBool, Ordering::SeqCst, }, Arc, }, time::{ SystemTime, UNIX_EPOCH, }, }; use anyhow::Context; use async_trait::async_trait; use chunks::ApproxSize; use common::{ document::{ InternalId, ResolvedDocument, }, errors::lease_lost_error, heap_size::HeapSize, index::{ IndexEntry, IndexKeyBytes, SplitKey, MAX_INDEX_KEY_PREFIX_LEN, }, interval::Interval, knobs::{ MYSQL_MAX_QUERY_BATCH_SIZE, MYSQL_MAX_QUERY_DYNAMIC_BATCH_SIZE, MYSQL_MIN_QUERY_BATCH_SIZE, }, persistence::{ ConflictStrategy, DocumentLogEntry, DocumentPrevTsQuery, DocumentRevisionStream, DocumentStream, IndexStream, LatestDocument, Persistence, PersistenceGlobalKey, PersistenceIndexEntry, PersistenceReader, PersistenceTableSize, RetentionValidator, TimestampRange, }, persistence_helpers::{ DocumentRevision, RevisionPair, }, query::Order, runtime::Runtime, sha256::Sha256, shutdown::ShutdownSignal, types::{ IndexId, PersistenceVersion, Timestamp, }, value::{ ConvexValue, InternalDocumentId, TabletId, }, }; pub use connection::ConvexMySqlPool; use connection::{ MySqlConnection, MySqlTransaction, }; use fastrace::prelude::*; use futures::{ pin_mut, stream::{ StreamExt, TryStreamExt, }, }; use futures_async_stream::try_stream; use metrics::write_persistence_global_timer; use mysql_async::Row; use serde::Deserialize; use serde_json::Value as JsonValue; use smallvec::SmallVec; use crate::{ chunks::smart_chunks, metrics::{ log_prev_revisions_row_read, QueryIndexStats, }, }; #[derive(Clone, Debug)] pub struct MySqlInstanceName { raw: String, } impl Deref for MySqlInstanceName { type Target = str; fn deref(&self) -> &Self::Target { &self.raw } } impl<T: ToString> From<T> for MySqlInstanceName { fn from(raw: T) -> Self { Self::new(raw.to_string()) } } impl MySqlInstanceName { pub fn new(raw: String) -> Self { Self { raw } } } #[derive(PartialEq, Eq, Hash, Copy, Clone)] enum BoundType { Unbounded, Included, Excluded, } pub struct MySqlPersistence<RT: Runtime> { newly_created: AtomicBool, lease: Lease<RT>, // Used by the reader. read_pool: Arc<ConvexMySqlPool<RT>>, db_name: String, version: PersistenceVersion, instance_name: MySqlInstanceName, multitenant: bool, } #[derive(thiserror::Error, Debug)] pub enum ConnectError { #[error("persistence is read-only, data migration in progress")] ReadOnly, #[error(transparent)] Other(#[from] anyhow::Error), } #[derive(Clone, Debug)] pub struct MySqlOptions { pub allow_read_only: bool, pub version: PersistenceVersion, pub instance_name: MySqlInstanceName, pub multitenant: bool, } #[derive(Debug)] pub struct MySqlReaderOptions { pub db_should_be_leader: bool, pub version: PersistenceVersion, pub instance_name: MySqlInstanceName, pub multitenant: bool, } impl<RT: Runtime> MySqlPersistence<RT> { pub async fn new( pool: Arc<ConvexMySqlPool<RT>>, db_name: String, options: MySqlOptions, lease_lost_shutdown: ShutdownSignal, ) -> Result<Self, ConnectError> { let newly_created = { let mut client = pool.acquire("init_sql", &db_name).await?; let table_count: usize = client .query_optional(sql::GET_TABLE_COUNT, vec![(&db_name).into()]) .await? .context("GET_TABLE_COUNT query returned no rows?")? .get(0) .context("GET_TABLE_COUNT query returned zero columns?")?; // Only run INIT_SQL if we have less tables than we expect. We suspect // CREATE TABLE IF EXISTS is creating lock contention due to acquiring // an exclusive lock https://bugs.mysql.com/bug.php?id=63144. if table_count < sql::EXPECTED_TABLE_COUNT { tracing::info!("Initializing MySQL Persistence..."); client .execute_many(sql::init_sql(options.multitenant)) .await?; } else { tracing::info!("MySQL Persistence already initialized"); } client .exec_iter( sql::init_lease(options.multitenant), if options.multitenant { vec![(&options.instance_name.raw).into()] } else { vec![] }, ) .await?; Self::check_newly_created(&mut client, options.multitenant, &options.instance_name) .await? }; let mut client = pool.acquire("read_only", &db_name).await?; if !options.allow_read_only && Self::is_read_only(&mut client, options.multitenant, &options.instance_name).await? { return Err(ConnectError::ReadOnly); } let lease = Lease::acquire( pool.clone(), db_name.clone(), options.instance_name.clone(), options.multitenant, lease_lost_shutdown, ) .await?; Ok(Self { newly_created: newly_created.into(), lease, read_pool: pool, db_name, version: options.version, instance_name: options.instance_name, multitenant: options.multitenant, }) } pub fn new_reader( pool: Arc<ConvexMySqlPool<RT>>, db_name: String, options: MySqlReaderOptions, ) -> MySqlReader<RT> { MySqlReader { db_name, read_pool: pool, db_should_be_leader: options.db_should_be_leader, version: options.version, instance_name: options.instance_name, multitenant: options.multitenant, } } async fn is_read_only( client: &mut MySqlConnection<'_>, multitenant: bool, instance_name: &MySqlInstanceName, ) -> anyhow::Result<bool> { let mut params = vec![]; if multitenant { params.push((&instance_name.raw).into()); } Ok(client .query_optional(sql::check_is_read_only(multitenant), params) .await? .is_some()) } async fn check_newly_created( client: &mut MySqlConnection<'_>, multitenant: bool, instance_name: &MySqlInstanceName, ) -> anyhow::Result<bool> { let mut params = vec![]; if multitenant { params.push((&instance_name.raw).into()); } Ok(client .query_optional(sql::check_newly_created(multitenant), params) .await? .is_none()) } #[cfg(test)] pub(crate) async fn get_table_count(&self) -> anyhow::Result<usize> { let mut client = self .read_pool .acquire("get_table_count", &self.db_name) .await?; client .query_optional(sql::GET_TABLE_COUNT, vec![(&self.db_name).into()]) .await? .context("GET_TABLE_COUNT query returned no rows?")? .get(0) .context("GET_TABLE_COUNT query returned zero columns?") } } #[async_trait] impl<RT: Runtime> Persistence for MySqlPersistence<RT> { fn is_fresh(&self) -> bool { self.newly_created.load(SeqCst) } fn reader(&self) -> Arc<dyn PersistenceReader> { Arc::new(MySqlReader { db_name: self.db_name.clone(), read_pool: self.read_pool.clone(), db_should_be_leader: true, version: self.version, instance_name: self.instance_name.clone(), multitenant: self.multitenant, }) } #[fastrace::trace] async fn write<'a>( &self, documents: &'a [DocumentLogEntry], indexes: &'a [PersistenceIndexEntry], conflict_strategy: ConflictStrategy, ) -> anyhow::Result<()> { anyhow::ensure!(documents.len() <= sql::MAX_INSERT_SIZE); let mut write_size = 0; for update in documents { match &update.value { Some(doc) => { anyhow::ensure!(update.id == doc.id_with_table_id()); write_size += doc.heap_size(); }, None => {}, } } metrics::log_write_bytes(write_size); metrics::log_write_documents(documents.len()); LocalSpan::add_event(Event::new("write_to_persistence_size").with_properties(|| { [ ("num_documents", documents.len().to_string()), ("write_size", write_size.to_string()), ] })); // True, the below might end up failing and not changing anything. self.newly_created.store(false, SeqCst); let cluster_name = self.read_pool.cluster_name().to_owned(); let multitenant = self.multitenant; let instance_name = mysql_async::Value::from(&self.instance_name.raw); self.lease .transact(async move |tx| { // First, process all of the full document chunks. let mut document_chunks = smart_chunks(documents); for chunk in &mut document_chunks { let chunk_bytes: usize = chunk.iter().map(|item| item.approx_size()).sum(); let insert_chunk_query = match conflict_strategy { ConflictStrategy::Error => { sql::insert_document_chunk(chunk.len(), multitenant) }, ConflictStrategy::Overwrite => { sql::insert_overwrite_document_chunk(chunk.len(), multitenant) }, }; let mut insert_document_chunk = Vec::with_capacity( chunk.len() * (sql::INSERT_DOCUMENT_COLUMN_COUNT + (multitenant as usize)), ); for update in chunk { if multitenant { insert_document_chunk.push(instance_name.clone()); } insert_document_chunk = document_params( insert_document_chunk, update.ts, update.id, update.value.clone(), update.prev_ts, )?; } let future = async { let timer = metrics::insert_document_chunk_timer(cluster_name.as_str()); tx.exec_drop(insert_chunk_query, insert_document_chunk) .await?; timer.finish(); LocalSpan::add_event(Event::new("document_smart_chunks").with_properties( || { [ ("chunk_length", chunk.len().to_string()), ("chunk_bytes", chunk_bytes.to_string()), ] }, )); Ok::<_, anyhow::Error>(()) }; future .in_span(Span::enter_with_local_parent(format!( "{}::document_chunk_write", func_path!() ))) .await?; } let mut index_chunks = smart_chunks(indexes); for chunk in &mut index_chunks { let chunk_bytes: usize = chunk.iter().map(|item| item.approx_size()).sum(); let insert_chunk_query = sql::insert_index_chunk(chunk.len(), multitenant); let insert_overwrite_chunk_query = sql::insert_overwrite_index_chunk(chunk.len(), multitenant); let insert_index_chunk = match conflict_strategy { ConflictStrategy::Error => &insert_chunk_query, ConflictStrategy::Overwrite => &insert_overwrite_chunk_query, }; let mut insert_index_chunk_params = Vec::with_capacity( chunk.len() * (sql::INSERT_INDEX_COLUMN_COUNT + (multitenant as usize)), ); for update in chunk { if multitenant { insert_index_chunk_params.push(instance_name.clone()); } index_params(&mut insert_index_chunk_params, update); } let future = async { let timer = metrics::insert_index_chunk_timer(cluster_name.as_str()); tx.exec_drop(insert_index_chunk, insert_index_chunk_params) .await?; timer.finish(); LocalSpan::add_event(Event::new("index_smart_chunks").with_properties( || { [ ("chunk_length", chunk.len().to_string()), ("chunk_bytes", chunk_bytes.to_string()), ] }, )); Ok::<_, anyhow::Error>(()) }; future .in_span(Span::enter_with_local_parent(format!( "{}::index_chunk_write", func_path!() ))) .await?; } Ok(()) }) .await } async fn set_read_only(&self, read_only: bool) -> anyhow::Result<()> { let multitenant = self.multitenant; let instance_name = mysql_async::Value::from(&self.instance_name.raw); let params = if multitenant { vec![instance_name] } else { vec![] }; self.lease .transact(async move |tx| { let statement = if read_only { sql::set_read_only(multitenant) } else { sql::unset_read_only(multitenant) }; tx.exec_drop(statement, params).await?; Ok(()) }) .await } async fn write_persistence_global( &self, key: PersistenceGlobalKey, value: JsonValue, ) -> anyhow::Result<()> { let timer = write_persistence_global_timer(self.read_pool.cluster_name()); let multitenant = self.multitenant; let instance_name = mysql_async::Value::from(&self.instance_name.raw); self.lease .transact(async move |tx| { let stmt = sql::write_persistence_global(multitenant); let mut params = if multitenant { vec![instance_name] } else { vec![] }; params.extend([String::from(key).into(), value.into()]); tx.exec_drop(stmt, params).await?; Ok(()) }) .await?; timer.finish(); Ok(()) } async fn load_index_chunk( &self, cursor: Option<IndexEntry>, chunk_size: usize, ) -> anyhow::Result<Vec<IndexEntry>> { let mut client = self .read_pool .acquire("load_index_chunk", &self.db_name) .await?; let stmt = sql::load_indexes_page(self.multitenant); let mut params = MySqlReader::<RT>::_index_cursor_params(cursor.as_ref()); if self.multitenant { params.push(self.instance_name.to_string().into()); } params.push((chunk_size as i64).into()); let row_stream = client.query_stream(stmt, params, chunk_size).await?; let parsed = row_stream.map(|row| parse_row(&row?)); parsed.try_collect().await } async fn delete_index_entries( &self, expired_entries: Vec<IndexEntry>, ) -> anyhow::Result<usize> { let multitenant = self.multitenant; let instance_name = mysql_async::Value::from(&self.instance_name.raw); self.lease .transact(async move |tx| { let mut deleted_count = 0; for chunk in smart_chunks(&expired_entries) { let mut params = Vec::with_capacity( chunk.len() * (sql::DELETE_INDEX_COLUMN_COUNT + (multitenant as usize)), ); for index_entry in chunk.iter() { MySqlReader::<RT>::_index_delete_params(&mut params, index_entry); if multitenant { params.push(instance_name.clone()); } } deleted_count += tx .exec_iter(sql::delete_index_chunk(chunk.len(), multitenant), params) .await?; } Ok(deleted_count as usize) }) .await } async fn delete( &self, documents: Vec<(Timestamp, InternalDocumentId)>, ) -> anyhow::Result<usize> { let multitenant = self.multitenant; let instance_name = mysql_async::Value::from(&self.instance_name.raw); self.lease .transact(async move |tx| { let mut deleted_count = 0; for chunk in smart_chunks(&documents) { let mut params = Vec::with_capacity( chunk.len() * (sql::DELETE_DOCUMENT_COLUMN_COUNT + (multitenant as usize)), ); for doc in chunk.iter() { MySqlReader::<RT>::_document_delete_params(&mut params, doc); if multitenant { params.push(instance_name.clone()); } } deleted_count += tx .exec_iter(sql::delete_document_chunk(chunk.len(), multitenant), params) .await?; } Ok(deleted_count as usize) }) .await } } #[derive(Clone)] pub struct MySqlReader<RT: Runtime> { read_pool: Arc<ConvexMySqlPool<RT>>, db_name: String, instance_name: MySqlInstanceName, multitenant: bool, /// Set `db_should_be_leader` if this PostgresReader should be connected /// to the database leader. In particular, we protect against heterogenous /// connection pools where one connection is to the leader and another is to /// a follower. #[allow(unused)] db_should_be_leader: bool, version: PersistenceVersion, } impl<RT: Runtime> MySqlReader<RT> { fn initial_id_param(order: Order) -> Vec<u8> { match order { Order::Asc => InternalId::BEFORE_ALL_BYTES.to_vec(), Order::Desc => InternalId::AFTER_ALL_BYTES.to_vec(), } } fn row_to_document( &self, row: Row, ) -> anyhow::Result<( Timestamp, InternalDocumentId, Option<ResolvedDocument>, Option<Timestamp>, )> { let (ts, id, doc, prev_ts) = self.row_to_document_inner(row)?; Ok((ts, id, doc, prev_ts)) } fn row_to_document_inner( &self, row: Row, ) -> anyhow::Result<( Timestamp, InternalDocumentId, Option<ResolvedDocument>, Option<Timestamp>, )> { let bytes: Vec<u8> = row.get(0).unwrap(); let internal_id = InternalId::try_from(bytes)?; let ts: i64 = row.get(1).unwrap(); let ts = Timestamp::try_from(ts)?; let table_b: Vec<u8> = row.get(2).unwrap(); let json_value: Vec<u8> = row.get(3).unwrap(); let json_value: JsonValue = serde_json::from_slice(&json_value)?; let deleted: bool = row.get(4).unwrap(); let table = TabletId(table_b.try_into()?); let document_id = InternalDocumentId::new(table, internal_id); let document = if !deleted { let value: ConvexValue = json_value.try_into()?; Some(ResolvedDocument::from_database(table, value)?) } else { None }; let prev_ts: Option<i64> = row.get(5).unwrap(); let prev_ts = prev_ts.map(Timestamp::try_from).transpose()?; Ok((ts, document_id, document, prev_ts)) } // If `include_prev_rev` is false then the returned // RevisionPair.prev_rev.document will always be None (but prev_rev.ts will // still be correct) #[try_stream( ok = RevisionPair, error = anyhow::Error, )] async fn _load_documents( &self, tablet_id: Option<TabletId>, include_prev_rev: bool, range: TimestampRange, order: Order, page_size: u32, retention_validator: Arc<dyn RetentionValidator>, ) { anyhow::ensure!(page_size > 0); // 0 size pages loop forever. let timer = metrics::load_documents_timer(self.read_pool.cluster_name()); let mut client = self .read_pool .acquire("load_documents", &self.db_name) .await?; let mut num_returned = 0; let mut last_ts = match order { Order::Asc => Timestamp::MIN, Order::Desc => Timestamp::MAX, }; let mut last_tablet_id_param = Self::initial_id_param(order); let mut last_id_param = Self::initial_id_param(order); loop { let mut rows_loaded = 0; let query = match order { Order::Asc => sql::load_docs_by_ts_page_asc( self.multitenant, tablet_id.is_some(), include_prev_rev, ), Order::Desc => sql::load_docs_by_ts_page_desc( self.multitenant, tablet_id.is_some(), include_prev_rev, ), }; let mut params = vec![ i64::from(range.min_timestamp_inclusive()).into(), i64::from(range.max_timestamp_exclusive()).into(), i64::from(last_ts).into(), i64::from(last_ts).into(), last_tablet_id_param.clone().into(), last_tablet_id_param.clone().into(), last_id_param.clone().into(), ]; if let Some(tablet_id) = tablet_id { params.push(tablet_id.0 .0.into()); } if self.multitenant { params.push(self.instance_name.to_string().into()); } params.push((page_size as i64).into()); let row_stream = client .query_stream(query, params, page_size as usize) .await?; retention_validator .validate_document_snapshot(range.min_timestamp_inclusive()) .await?; futures::pin_mut!(row_stream); while let Some(row) = row_stream.try_next().await? { let prev_rev_value: Option<Vec<u8>> = if include_prev_rev { row.get_opt(6).context("missing column")?? } else { None }; let (ts, document_id, document, prev_ts) = self.row_to_document(row)?; let prev_rev_document: Option<ResolvedDocument> = prev_rev_value .map(|v| { let json_value: JsonValue = serde_json::from_slice(&v) .context("Failed to deserialize database value")?; // N.B.: previous revisions should never be deleted, so we don't check that. let value: ConvexValue = json_value.try_into()?; ResolvedDocument::from_database(document_id.table(), value) }) .transpose()?; rows_loaded += 1; last_ts = ts; last_tablet_id_param = internal_id_param(document_id.table().0); last_id_param = internal_doc_id_param(document_id); num_returned += 1; yield RevisionPair { id: document_id, rev: DocumentRevision { ts, document }, prev_rev: prev_ts.map(|prev_ts| DocumentRevision { ts: prev_ts, document: prev_rev_document, }), } } if rows_loaded < page_size { break; } } metrics::finish_load_documents_timer(timer, num_returned, self.read_pool.cluster_name()); } #[allow(clippy::needless_lifetimes)] #[try_stream(ok = (IndexKeyBytes, LatestDocument), error = anyhow::Error)] async fn _index_scan( &self, index_id: IndexId, tablet_id: TabletId, read_timestamp: Timestamp, interval: Interval, order: Order, size_hint: usize, retention_validator: Arc<dyn RetentionValidator>, ) { let scan = self._index_scan_inner( index_id, read_timestamp, interval, order, size_hint, retention_validator, ); pin_mut!(scan); while let Some((key, ts, value, prev_ts)) = scan.try_next().await? { let document = ResolvedDocument::from_database(tablet_id, value)?; yield ( key, LatestDocument { ts, value: document, prev_ts, }, ); } } #[allow(clippy::needless_lifetimes)] #[try_stream( ok = (IndexKeyBytes, Timestamp, ConvexValue, Option<Timestamp>), error = anyhow::Error )] async fn _index_scan_inner( &self, index_id: IndexId, read_timestamp: Timestamp, interval: Interval, order: Order, size_hint: usize, retention_validator: Arc<dyn RetentionValidator>, ) { let _timer = metrics::query_index_timer(self.read_pool.cluster_name()); let (mut lower, mut upper) = sql::to_sql_bounds(interval.clone()); let mut stats = QueryIndexStats::new(self.read_pool.cluster_name()); // We use the size_hint to determine the batch size. This means in the // common case we should do a single query. Exceptions are if the size_hint // is wrong or if we truncate it or if we observe too many deletes. let mut batch_size = size_hint.clamp(*MYSQL_MIN_QUERY_BATCH_SIZE, *MYSQL_MAX_QUERY_BATCH_SIZE); // We iterate results in (key_prefix, key_sha256) order while we actually // need them in (key_prefix, key_suffix order). key_suffix is not part of the // primary key so we do the sort here. If see any record with maximum length // prefix, we should buffer it until we reach a different prefix. let mut result_buffer: Vec<(IndexKeyBytes, Timestamp, ConvexValue, Option<Timestamp>)> = Vec::new(); let mut has_more = true; while has_more { let page = { let mut to_yield = vec![]; // Avoid holding connections across yield points, to limit lifetime // and improve fairness. let mut client = self.read_pool.acquire("index_scan", &self.db_name).await?; stats.sql_statements += 1; let (query, params) = sql::index_query( index_id, read_timestamp, lower.clone(), upper.clone(), order, batch_size, self.multitenant, &self.instance_name, ); let prepare_timer = metrics::query_index_sql_prepare_timer(self.read_pool.cluster_name()); prepare_timer.finish(); let execute_timer = metrics::query_index_sql_execute_timer(self.read_pool.cluster_name()); let row_stream = client.query_stream(query, params, batch_size).await?; execute_timer.finish(); let retention_validate_timer = metrics::retention_validate_timer(self.read_pool.cluster_name()); retention_validator .validate_snapshot(read_timestamp) .await?; retention_validate_timer.finish(); futures::pin_mut!(row_stream); let mut batch_rows = 0; while let Some(row) = row_stream.try_next().await? { batch_rows += 1; stats.rows_read += 1; // Fetch let internal_row = parse_row(&row)?; // Yield buffered results if applicable. if let Some((buffer_key, ..)) = result_buffer.first() { if buffer_key[..MAX_INDEX_KEY_PREFIX_LEN] != internal_row.key_prefix { // We have exhausted all results that share the same key prefix // we can sort and yield the buffered results. result_buffer.sort_by(|a, b| a.0.cmp(&b.0)); for (key, ts, doc, prev_ts) in order.apply(result_buffer.drain(..)) { if interval.contains(&key) { stats.rows_returned += 1; to_yield.push((key, ts, doc, prev_ts)); } else { stats.rows_skipped_out_of_range += 1; } } } } // Update the bounds for future queries. let bound = Bound::Excluded(sql::SqlKey { prefix: internal_row.key_prefix.clone(), sha256: internal_row.key_sha256.clone(), }); match order { Order::Asc => lower = bound, Order::Desc => upper = bound, } // Filter if needed. if internal_row.deleted { stats.rows_skipped_deleted += 1; continue; } // Construct key. let mut key = internal_row.key_prefix; if let Some(key_suffix) = internal_row.key_suffix { key.extend(key_suffix); }; let ts = internal_row.ts; // Fetch the remaining columns and construct the document let table_b: Option<Vec<u8>> = row.get(7).unwrap(); table_b.ok_or_else(|| { anyhow::anyhow!("Dangling index reference for {:?} {:?}", key, ts) })?; let json_value: Vec<u8> = row.get(8).unwrap(); let json_value: JsonValue = serde_json::from_slice(&json_value)?; anyhow::ensure!( json_value != serde_json::Value::Null, "Index reference to deleted document {:?} {:?}", key, ts ); let value: ConvexValue = json_value.try_into()?; let prev_ts: Option<i64> = row.get(9).unwrap(); let prev_ts = prev_ts.map(Timestamp::try_from).transpose()?; if key.len() < MAX_INDEX_KEY_PREFIX_LEN { assert!(result_buffer.is_empty()); if interval.contains(&key) { stats.rows_returned += 1; to_yield.push((IndexKeyBytes(key), ts, value, prev_ts)); } else { stats.rows_skipped_out_of_range += 1; } } else { // There might be other records with the same key_prefix that // are ordered before this result. Buffer it. result_buffer.push((IndexKeyBytes(key), ts, value, prev_ts)); stats.max_rows_buffered = cmp::max(result_buffer.len(), stats.max_rows_buffered); } } if batch_rows < batch_size { // Yield any remaining values. result_buffer.sort_by(|a, b| a.0.cmp(&b.0)); for (key, ts, doc, prev_ts) in order.apply(result_buffer.drain(..)) { if interval.contains(&key) { stats.rows_returned += 1; to_yield.push((key, ts, doc, prev_ts)); } else { stats.rows_skipped_out_of_range += 1; } } has_more = false; } to_yield }; for document in page { yield document; } // Double the batch size every iteration until we max dynamic batch size. This // helps correct for tombstones, long prefixes and wrong client // size estimates. // TODO: Take size into consideration and increase the max dynamic batch size. if batch_size < *MYSQL_MAX_QUERY_DYNAMIC_BATCH_SIZE { batch_size = (batch_size * 2).min(*MYSQL_MAX_QUERY_DYNAMIC_BATCH_SIZE); } } } fn _index_cursor_params(cursor: Option<&IndexEntry>) -> Vec<mysql_async::Value> { let (last_id_param, last_key_prefix, last_sha256, last_ts): ( Vec<u8>, Vec<u8>, Vec<u8>, u64, ) = match cursor { Some(cursor) => ( cursor.index_id.into(), cursor.key_prefix.clone(), cursor.key_sha256.clone(), cursor.ts.into(), ), None => (Self::initial_id_param(Order::Asc), vec![], vec![], 0), }; vec![ last_id_param.clone().into(), last_id_param.into(), last_key_prefix.clone().into(), last_key_prefix.into(), last_sha256.clone().into(), last_sha256.into(), last_ts.into(), ] } fn _index_delete_params(query: &mut Vec<mysql_async::Value>, entry: &IndexEntry) { let last_id_param: Vec<u8> = entry.index_id.into(); let last_key_prefix: Vec<u8> = entry.key_prefix.clone(); let last_sha256: Vec<u8> = entry.key_sha256.clone(); let last_ts: u64 = entry.ts.into(); query.push(last_id_param.into()); query.push(last_key_prefix.into()); query.push(last_sha256.into()); query.push(last_ts.into()); } fn _document_delete_params( query: &mut Vec<mysql_async::Value>, (ts, internal_id): &(Timestamp, InternalDocumentId), ) { let tablet_id: Vec<u8> = internal_id.table().0.into(); let id: Vec<u8> = internal_id.internal_id().to_vec(); let ts: u64 = (*ts).into(); query.push(tablet_id.into()); query.push(id.into()); query.push(ts.into()); } } fn parse_row(row: &Row) -> anyhow::Result<IndexEntry> { let bytes: Vec<u8> = row.get(0).unwrap(); let index_id = InternalId::try_from(bytes).context("index_id wrong size")?; let key_prefix: Vec<u8> = row.get(1).unwrap(); let key_sha256: Vec<u8> = row.get(2).unwrap(); let key_suffix: Option<Vec<u8>> = row.get(3).unwrap(); let ts: i64 = row.get(4).unwrap(); let ts = Timestamp::try_from(ts)?; let deleted: bool = row.get(5).unwrap(); Ok(IndexEntry { index_id, key_prefix, key_suffix, key_sha256, ts, deleted, }) } #[async_trait] impl<RT: Runtime> PersistenceReader for MySqlReader<RT> { fn load_documents( &self, range: TimestampRange, order: Order, page_size: u32, retention_validator: Arc<dyn RetentionValidator>, ) -> DocumentStream<'_> { self._load_documents( None, /* tablet_id */ false, /* include_prev_rev */ range, order, page_size, retention_validator, ) .map_ok(RevisionPair::into_log_entry) .boxed() } fn load_documents_from_table( &self, tablet_id: TabletId, range: TimestampRange, order: Order, page_size: u32, retention_validator: Arc<dyn RetentionValidator>, ) -> DocumentStream<'_> { self._load_documents( Some(tablet_id), false, /* include_prev_rev */ range, order, page_size, retention_validator, ) .map_ok(RevisionPair::into_log_entry) .boxed() } fn load_revision_pairs( &self, tablet_id: Option<TabletId>, range: TimestampRange, order: Order, page_size: u32, retention_validator: Arc<dyn RetentionValidator>, ) -> DocumentRevisionStream<'_> { self._load_documents( tablet_id, true, /* include_prev_rev */ range, order, page_size, retention_validator, ) .boxed() } async fn previous_revisions_of_documents( &self, ids: BTreeSet<DocumentPrevTsQuery>, retention_validator: Arc<dyn RetentionValidator>, ) -> anyhow::Result<BTreeMap<DocumentPrevTsQuery, DocumentLogEntry>> { let timer = metrics::previous_revisions_of_documents_timer(self.read_pool.cluster_name()); let mut client = self .read_pool .acquire("previous_revisions_of_documents", &self.db_name) .await?; let ids: Vec<_> = ids.into_iter().collect(); let mut result = BTreeMap::new(); let multitenant = self.multitenant; let instance_name: mysql_async::Value = (&self.instance_name.raw).into(); for chunk in smart_chunks(&ids) { let mut params = Vec::with_capacity( chunk.len() * (sql::EXACT_REV_CHUNK_PARAMS + multitenant as usize), ); let mut id_ts_to_query: HashMap< (InternalDocumentId, Timestamp), SmallVec<[DocumentPrevTsQuery; 1]>, > = HashMap::with_capacity(chunk.len()); for q @ &DocumentPrevTsQuery { id, ts: _, prev_ts } in chunk { params.push(internal_id_param(id.table().0).into()); params.push(internal_doc_id_param(id).into()); params.push(i64::from(prev_ts).into()); if multitenant { params.push(instance_name.clone()); } // the underlying query does not care about `ts` and will // deduplicate, so create a map from DB results back to queries id_ts_to_query.entry((id, prev_ts)).or_default().push(*q); } let result_stream = client .query_stream( sql::exact_rev_chunk(chunk.len(), multitenant), params, chunk.len(), ) .await?; pin_mut!(result_stream); while let Some(row) = result_stream.try_next().await? { let (prev_ts, id, maybe_doc, prev_prev_ts) = self.row_to_document(row)?; let entry = DocumentLogEntry { ts: prev_ts, id, value: maybe_doc, prev_ts: prev_prev_ts, }; let original_queries = id_ts_to_query .get(&(id, prev_ts)) .context("exact_rev_chunk query returned an unasked row")?; for (entry, &q) in iter::repeat_n(entry, original_queries.len()).zip(original_queries) { anyhow::ensure!(result.insert(q, entry).is_none()); } } } if let Some(min_ts) = ids.iter().map(|DocumentPrevTsQuery { ts, .. }| *ts).min() { // Validate retention after finding documents retention_validator .validate_document_snapshot(min_ts) .await?; } timer.finish(); Ok(result) } async fn previous_revisions( &self, ids: BTreeSet<(InternalDocumentId, Timestamp)>, retention_validator: Arc<dyn RetentionValidator>, ) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>> { let timer = metrics::prev_revisions_timer(self.read_pool.cluster_name()); let mut client = self .read_pool .acquire("previous_revisions", &self.db_name) .await?; let ids: Vec<_> = ids.into_iter().collect(); let mut result = BTreeMap::new(); let mut results = vec![]; let mut min_ts = Timestamp::MAX; let multitenant = self.multitenant; let instance_name: mysql_async::Value = (&self.instance_name.raw).into(); for chunk in smart_chunks(&ids) { let mut params = Vec::with_capacity( chunk.len() * (sql::PREV_REV_CHUNK_PARAMS + multitenant as usize), ); for (id, ts) in chunk { params.push(i64::from(*ts).into()); params.push(internal_id_param(id.table().0).into()); params.push(internal_doc_id_param(*id).into()); params.push(i64::from(*ts).into()); if multitenant { params.push(instance_name.clone()); } min_ts = cmp::min(*ts, min_ts); } let result_stream = client .query_stream( sql::prev_rev_chunk(chunk.len(), multitenant), params, chunk.len(), ) .await?; pin_mut!(result_stream); while let Some(result) = result_stream.try_next().await? { results.push(result); } } for row in results.into_iter() { let ts: i64 = row.get(6).unwrap(); let ts = Timestamp::try_from(ts)?; let (prev_ts, id, maybe_doc, prev_prev_ts) = self.row_to_document(row)?; anyhow::ensure!(result .insert( (id, ts), DocumentLogEntry { ts: prev_ts, id, value: maybe_doc, prev_ts: prev_prev_ts, } ) .is_none()); log_prev_revisions_row_read(self.read_pool.cluster_name()); } retention_validator .validate_document_snapshot(min_ts) .await?; timer.finish(); Ok(result) } fn index_scan( &self, index_id: IndexId, tablet_id: TabletId, read_timestamp: Timestamp, range: &Interval, order: Order, size_hint: usize, retention_validator: Arc<dyn RetentionValidator>, ) -> IndexStream<'_> { self._index_scan( index_id, tablet_id, read_timestamp, range.clone(), order, size_hint, retention_validator, ) .boxed() } async fn get_persistence_global( &self, key: PersistenceGlobalKey, ) -> anyhow::Result<Option<JsonValue>> { let mut client = self .read_pool .acquire("get_persistence_global", &self.db_name) .await?; let mut params = vec![String::from(key).into()]; if self.multitenant { params.push(self.instance_name.to_string().into()); } let row_stream = client .query_stream(sql::get_persistence_global(self.multitenant), params, 1) .await?; futures::pin_mut!(row_stream); let row = row_stream.try_next().await?; let value = row.map(|r| -> anyhow::Result<JsonValue> { let binary_value: Vec<u8> = r.get(0).unwrap(); let mut json_deserializer = serde_json::Deserializer::from_slice(&binary_value); // XXX: this is bad, but shapes can get much more nested than convex values json_deserializer.disable_recursion_limit(); let json_value = JsonValue::deserialize(&mut json_deserializer) .with_context(|| format!("Invalid JSON at persistence key {key:?}"))?; json_deserializer.end()?; Ok(json_value) }); value.transpose() } fn version(&self) -> PersistenceVersion { self.version } async fn table_size_stats(&self) -> anyhow::Result<Vec<PersistenceTableSize>> { let mut client = self .read_pool .acquire("table_size_stats", &self.db_name) .await?; let stats = client .query_stream(sql::TABLE_SIZE_QUERY, vec![self.db_name.clone().into()], 5) .await? .map(|row| { let row = row?; anyhow::Ok(PersistenceTableSize { table_name: row.get_opt(0).unwrap()?, data_bytes: row.get_opt(1).unwrap()?, index_bytes: row.get_opt(2).unwrap()?, row_count: row.get_opt(3).unwrap()?, }) }) .try_collect() .await?; Ok(stats) } } /// A `Lease` is unique for an instance across all of the processes in the /// system. Its purpose is to make it safe to have multiple processes running /// for the same instance at once, since we cannot truly guarantee that it will /// not happen (e.g. processes unreachable by coordinator but still active, or /// late-delivered packets from an already dead process) and we want to /// purposefully run multiple so that one can coordinate all writes and the /// others can serve stale reads, and smoothly swap between them during /// deployment and node failure. /// /// The only thing a `Lease` can do is execute a transaction against the /// database and atomically ensure that the lease was still held during the /// transaction, and otherwise return a lease lost. struct Lease<RT: Runtime> { pool: Arc<ConvexMySqlPool<RT>>, db_name: String, instance_name: MySqlInstanceName, multitenant: bool, lease_ts: i64, lease_lost_shutdown: ShutdownSignal, } impl<RT: Runtime> Lease<RT> { /// Acquire a lease. Makes other lease-holders get `LeaseLostError` when /// they commit. async fn acquire( pool: Arc<ConvexMySqlPool<RT>>, db_name: String, instance_name: MySqlInstanceName, multitenant: bool, lease_lost_shutdown: ShutdownSignal, ) -> anyhow::Result<Self> { let timer = metrics::lease_acquire_timer(pool.cluster_name()); let mut client = pool.acquire("lease_acquire", &db_name).await?; let ts = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("before 1970") .as_nanos() as i64; tracing::info!("attempting to acquire lease"); let mut params = vec![ts.into(), ts.into()]; if multitenant { params.push((&instance_name.raw).into()); } let rows_modified = client .exec_iter(sql::lease_acquire(multitenant), params) .await?; anyhow::ensure!( rows_modified == 1, "failed to acquire lease: Already acquired with higher timestamp" ); tracing::info!("lease acquired with ts {}", ts); timer.finish(); Ok(Self { db_name, pool, lease_ts: ts, lease_lost_shutdown, instance_name, multitenant, }) } /// Execute the transaction function f atomically ensuring that the lease is /// still held, otherwise return lease lost. /// /// Once `transact` returns lease lost, no future transactions using /// it will succeed. Instead, a new `Lease` must be made with `acquire`, /// and any in-memory state then resynced because of any changes that /// might've been made to the database state while the lease was not /// held. #[fastrace::trace] async fn transact<F, T>(&self, f: F) -> anyhow::Result<T> where F: for<'a> AsyncFnOnce(&'a mut MySqlTransaction<'_>) -> anyhow::Result<T>, { let mut client = self.pool.acquire("transact", &self.db_name).await?; let mut tx = client.transaction(&self.db_name).await?; let timer = metrics::lease_precond_timer(self.pool.cluster_name()); let mut params = vec![mysql_async::Value::Int(self.lease_ts)]; if self.multitenant { params.push((&self.instance_name.raw).into()); } let rows: Option<Row> = tx .exec_first(sql::lease_precond(self.multitenant), params) .in_span(Span::enter_with_local_parent(format!( "{}::lease_precondition", func_path!() ))) .await?; if rows.is_none() { self.lease_lost_shutdown.signal(lease_lost_error()); anyhow::bail!(lease_lost_error()); } timer.finish(); let result = f(&mut tx) .in_span(Span::enter_with_local_parent(format!( "{}::execute_function", func_path!() ))) .await?; let timer = metrics::commit_timer(self.pool.cluster_name()); tx.commit().await?; timer.finish(); Ok(result) } } fn document_params( mut query: Vec<mysql_async::Value>, ts: Timestamp, id: InternalDocumentId, maybe_doc: Option<ResolvedDocument>, prev_ts: Option<Timestamp>, ) -> anyhow::Result<Vec<mysql_async::Value>> { let (json_str, deleted) = match maybe_doc { Some(document) => (document.value().json_serialize()?, false), None => (serde_json::Value::Null.to_string(), true), }; query.push(internal_doc_id_param(id).into()); query.push(i64::from(ts).into()); query.push(internal_id_param(id.table().0).into()); query.push(mysql_async::Value::Bytes(json_str.into_bytes())); query.push(deleted.into()); query.push(prev_ts.map(i64::from).into()); Ok(query) } fn internal_id_param(id: InternalId) -> Vec<u8> { id.into() } fn internal_doc_id_param(id: InternalDocumentId) -> Vec<u8> { internal_id_param(id.internal_id()) } fn index_params(query: &mut Vec<mysql_async::Value>, update: &PersistenceIndexEntry) { let key: Vec<u8> = update.key.to_vec(); let key_sha256 = Sha256::hash(&key); let key = SplitKey::new(key); let (deleted, tablet_id, doc_id) = match &update.value { None => (true, None, None), Some(doc_id) => ( false, Some(internal_id_param(doc_id.table().0)), Some(internal_doc_id_param(*doc_id)), ), }; query.push(internal_id_param(update.index_id).into()); query.push(i64::from(update.ts).into()); query.push(key.prefix.into()); query.push( match key.suffix { Some(key_suffix) => Some(key_suffix), None => None, } .into(), ); query.push(key_sha256.to_vec().into()); query.push(deleted.into()); query.push(tablet_id.into()); query.push(doc_id.into()); } #[cfg(any(test, feature = "testing"))] pub mod itest { use std::path::Path; use mysql_async::{ prelude::Queryable, Conn, Params, }; use rand::Rng; use url::Url; // Returns a url to connect to the test cluster. The URL includes username and // password but no dbname. pub fn cluster_opts() -> String { let mysql_host = if Path::new("/convex.ro").exists() { // itest "mysql" } else { // local "localhost" }; format!("mysql://root:@{mysql_host}:3306") } pub struct MySqlOpts { pub db_name: String, pub url: Url, } /// Returns connection options for a guaranteed-fresh Postgres database. pub async fn new_db_opts() -> anyhow::Result<MySqlOpts> { let cluster_url = cluster_opts(); let id: [u8; 16] = rand::rng().random(); let db_name = "test_db_".to_string() + &hex::encode(&id[..]); // Connect using db `mysql`, create a fresh DB, and then return the connection // options for that one. let mut conn = Conn::from_url(format!("{cluster_url}/mysql")).await?; let query = "CREATE DATABASE ".to_string() + &db_name; conn.exec_drop(query.as_str(), Params::Empty).await?; println!("DBNAME @{db_name}"); Ok(MySqlOpts { // We use the cluster URL to connect to connect to persistence and // then pass the db_name in the query themselves. url: cluster_url.parse()?, db_name, }) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server