Skip to main content
Glama

Convex MCP server

Official
by get-convex
sql.rs34.8 kB
use std::{ collections::HashMap, fmt::Write, iter, sync::LazyLock, }; use common::{ index::{ SplitKey, MAX_INDEX_KEY_PREFIX_LEN, }, interval::{ End, Interval, StartIncluded, }, query::Order, types::{ IndexId, Timestamp, }, }; use const_format::formatcp; use itertools::{ iproduct, Itertools, }; use crate::{ chunks::smart_chunk_sizes, BoundType, MySqlInstanceName, }; macro_rules! as_table { ([$param:ident $(, $rest:ident)*], $e: expr) => {{ [{ #[allow(non_upper_case_globals)] const $param: bool = false; as_table!([$($rest),*], $e) }, { #[allow(non_upper_case_globals)] const $param: bool = true; as_table!([$($rest),*], $e) }] }}; ([], $e: expr) => { $e } } /// Returns the appropriate expression based on the parameter value. macro_rules! tableify { ([$($param:ident),+], $e: expr) => {{ as_table!([$($param),*], $e) $( [$param as usize] )* }}; ($param:ident, $e: expr) => { tableify!([$param], $e) }; } pub const GET_TABLE_COUNT: &str = r#" SELECT COUNT(1) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ?; "#; // Expected table count after INIT_SQL is ran. pub const EXPECTED_TABLE_COUNT: usize = 5; // This runs (currently) every time a MySqlPersistence is created, so it // needs to not only be idempotent but not to affect any already-resident data. // IF NOT EXISTS and ON CONFLICT are helpful. pub const fn init_sql(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( r#" CREATE TABLE IF NOT EXISTS @db_name.documents ( {instance_col_def} id VARBINARY(32) NOT NULL, ts BIGINT NOT NULL, table_id VARBINARY(32) NOT NULL, json_value LONGBLOB NOT NULL, deleted BOOLEAN DEFAULT false, prev_ts BIGINT, PRIMARY KEY ({instance_col} ts, table_id, id), INDEX documents_by_table_and_id ({instance_col} table_id, id, ts) ) ROW_FORMAT=DYNAMIC; CREATE TABLE IF NOT EXISTS @db_name.indexes ( {instance_col_def} /* ids should be serialized as bytes but we keep it compatible with documents */ index_id VARBINARY(32) NOT NULL, ts BIGINT NOT NULL, /* MySQL maximum primary key length is 3072 bytes with DYNAMIC row format, which is why we split up the key. The first 2500 bytes are stored in key_prefix, and the remaining ones are stored in key suffix if applicable. NOTE: The key_prefix + key_suffix is store all values of IndexKey including the id. */ key_prefix VARBINARY(2500) NOT NULL, key_suffix LONGBLOB NULL, /* key_sha256 of the full key, used in primary key to avoid duplicates in case of key_prefix collision. */ key_sha256 BINARY(32) NOT NULL, deleted BOOLEAN, /* table_id and document_id should be populated iff deleted is false. */ table_id VARBINARY(32) NULL, document_id VARBINARY(32) NULL, PRIMARY KEY ({instance_col} index_id, key_prefix, key_sha256, ts) ) ROW_FORMAT=DYNAMIC; CREATE TABLE IF NOT EXISTS @db_name.leases ( {lease_col_def}, ts BIGINT NOT NULL, PRIMARY KEY ({lease_pk}) ) ROW_FORMAT=DYNAMIC; CREATE TABLE IF NOT EXISTS @db_name.read_only ( {read_only_col_def}, PRIMARY KEY ({read_only_pk}) ) ROW_FORMAT=DYNAMIC; CREATE TABLE IF NOT EXISTS @db_name.persistence_globals ( {instance_col_def} `key` VARCHAR(255) NOT NULL, json_value LONGBLOB NOT NULL, PRIMARY KEY ({instance_col} `key`) ) ROW_FORMAT=DYNAMIC;"#, instance_col_def = if multitenant { "instance_name VARCHAR(64) NOT NULL," } else { "" }, instance_col = if multitenant { "instance_name," } else { "" }, lease_col_def = if multitenant { "instance_name VARCHAR(64) NOT NULL" } else { "id BIGINT NOT NULL" }, lease_pk = if multitenant { "instance_name" } else { "id" }, read_only_col_def = if multitenant { "instance_name VARCHAR(64) NOT NULL" } else { "id BIGINT NOT NULL" }, read_only_pk = if multitenant { "instance_name" } else { "id" } ) ) } pub const fn init_lease(multitenant: bool) -> &'static str { tableify!( multitenant, // Note the no-op `ON DUPLICATE` expression to "do nothing" if there's a duplicate. // INSERT IGNORE ignores *all* errors, so this is considered best practice.. formatcp!( "INSERT INTO @db_name.leases ({lease_col_def}, ts) VALUES ({lease_val}, 0) ON \ DUPLICATE KEY UPDATE {lease_col_def} = {lease_col_def};", lease_col_def = if multitenant { "instance_name" } else { "id" }, lease_val = if multitenant { "?" } else { "1" } ) ) } /// Load a page of documents, where timestamps are bounded by [$1, $2), /// and ($3, $4, $5) is the (ts, table_id, id) from the last document read. pub const fn load_docs_by_ts_page_asc( multitenant: bool, tablet_filter: bool, include_prev_rev: bool, ) -> &'static str { tableify!([multitenant, tablet_filter, include_prev_rev], { formatcp!( r#"SELECT D.id, D.ts, D.table_id, D.json_value, D.deleted, D.prev_ts {prev_rev_col} FROM @db_name.documents D FORCE INDEX FOR ORDER BY (PRIMARY) {prev_rev_join} WHERE D.ts >= ? AND D.ts < ? AND (D.ts > ? OR (D.ts = ? AND (D.table_id > ? OR (D.table_id = ? AND D.id > ?)))) {tablet_filter} {instance_name_filter} ORDER BY D.ts ASC, D.table_id ASC, D.id ASC LIMIT ? "#, prev_rev_col = if include_prev_rev { ", P.json_value" } else { "" }, prev_rev_join = if include_prev_rev { formatcp!( "LEFT JOIN @db_name.documents P FORCE INDEX FOR JOIN (PRIMARY) ON P.table_id = D.table_id AND P.id = D.id AND P.ts = D.prev_ts {}", if multitenant { "AND P.instance_name = D.instance_name" } else { "" } ) } else { "" }, tablet_filter = if tablet_filter { "AND D.table_id = ?" } else { "" }, instance_name_filter = if multitenant { "AND D.instance_name = ?" } else { "" }, ) }) } pub const fn load_docs_by_ts_page_desc( multitenant: bool, tablet_filter: bool, include_prev_rev: bool, ) -> &'static str { tableify!([multitenant, tablet_filter, include_prev_rev], { formatcp!( r#"SELECT D.id, D.ts, D.table_id, D.json_value, D.deleted, D.prev_ts {prev_rev_col} FROM @db_name.documents D FORCE INDEX FOR ORDER BY (PRIMARY) {prev_rev_join} WHERE D.ts >= ? AND D.ts < ? AND (D.ts < ? OR (D.ts = ? AND (D.table_id < ? OR (D.table_id = ? AND D.id < ?)))) {tablet_filter} {instance_name_filter} ORDER BY D.ts DESC, D.table_id DESC, D.id DESC LIMIT ? "#, prev_rev_col = if include_prev_rev { ", P.json_value" } else { "" }, prev_rev_join = if include_prev_rev { formatcp!( "LEFT JOIN @db_name.documents P FORCE INDEX FOR JOIN (PRIMARY) ON P.table_id = D.table_id AND P.id = D.id AND P.ts = D.prev_ts {}", if multitenant { "AND P.instance_name = D.instance_name" } else { "" } ) } else { "" }, tablet_filter = if tablet_filter { "AND D.table_id = ?" } else { "" }, instance_name_filter = if multitenant { "AND D.instance_name = ?" } else { "" }, ) }) } pub const INSERT_DOCUMENT_COLUMN_COUNT: usize = 6; static INSERT_DOCUMENT_CHUNK_QUERIES: LazyLock<HashMap<(usize, bool), String>> = LazyLock::new(|| { smart_chunk_sizes() .flat_map(|chunk_size| { [false, true].into_iter().map(move |multitenant| { let query = if multitenant { let values = (1..=chunk_size) .map(|_| "(?, ?, ?, ?, ?, ?, ?)".to_string()) .join(", "); format!( r#"INSERT INTO @db_name.documents (instance_name, id, ts, table_id, json_value, deleted, prev_ts) VALUES {values}"# ) } else { let values = (1..=chunk_size) .map(|_| "(?, ?, ?, ?, ?, ?)".to_string()) .join(", "); format!( r#"INSERT INTO @db_name.documents (id, ts, table_id, json_value, deleted, prev_ts) VALUES {values}"# ) }; ((chunk_size, multitenant), query) }) }) .collect() }); pub fn insert_document_chunk(chunk_size: usize, multitenant: bool) -> &'static str { INSERT_DOCUMENT_CHUNK_QUERIES .get(&(chunk_size, multitenant)) .unwrap() } static INSERT_OVERWRITE_DOCUMENT_CHUNK_QUERIES: LazyLock<HashMap<(usize, bool), String>> = LazyLock::new(|| { smart_chunk_sizes() .flat_map(|chunk_size| { [false, true].into_iter().map(move |multitenant| { let query = if multitenant { let values = (1..=chunk_size) .map(|_| "(?, ?, ?, ?, ?, ?, ?)".to_string()) .join(", "); format!( r#"REPLACE INTO @db_name.documents (instance_name, id, ts, table_id, json_value, deleted, prev_ts) VALUES {values}"# ) } else { let values = (1..=chunk_size) .map(|_| "(?, ?, ?, ?, ?, ?)".to_string()) .join(", "); format!( r#"REPLACE INTO @db_name.documents (id, ts, table_id, json_value, deleted, prev_ts) VALUES {values}"# ) }; ((chunk_size, multitenant), query) }) }) .collect() }); pub fn insert_overwrite_document_chunk(chunk_size: usize, multitenant: bool) -> &'static str { INSERT_OVERWRITE_DOCUMENT_CHUNK_QUERIES .get(&(chunk_size, multitenant)) .unwrap() } pub const fn load_indexes_page(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( r#" SELECT index_id, key_prefix, key_sha256, key_suffix, ts, deleted FROM @db_name.indexes FORCE INDEX FOR ORDER BY (PRIMARY) WHERE index_id > ? OR (index_id = ? AND (key_prefix > ? OR (key_prefix = ? AND (key_sha256 > ? OR (key_sha256 = ? AND ts > ?))))) {where_clause} ORDER BY index_id ASC, key_prefix ASC, key_sha256 ASC, ts ASC LIMIT ? "#, where_clause = if multitenant { "AND instance_name = ?" } else { "" } ) ) } pub const INSERT_INDEX_COLUMN_COUNT: usize = 8; static INSERT_INDEX_CHUNK_QUERIES: LazyLock<HashMap<(usize, bool), String>> = LazyLock::new(|| { smart_chunk_sizes() .flat_map(|chunk_size| { [false, true].into_iter().map(move |multitenant| { let query = if multitenant { let values = (1..=chunk_size) .map(|_| "(?, ?, ?, ?, ?, ?, ?, ?, ?)".to_string()) .join(", "); format!( r#"INSERT INTO @db_name.indexes (instance_name, index_id, ts, key_prefix, key_suffix, key_sha256, deleted, table_id, document_id) VALUES {values}"# ) } else { let values = (1..=chunk_size) .map(|_| "(?, ?, ?, ?, ?, ?, ?, ?)".to_string()) .join(", "); format!( r#"INSERT INTO @db_name.indexes (index_id, ts, key_prefix, key_suffix, key_sha256, deleted, table_id, document_id) VALUES {values}"# ) }; ((chunk_size, multitenant), query) }) }) .collect() }); // Note that on conflict, there's no need to update any of the columns that are // part of the primary key, nor `key_suffix` as `key_sha256` is derived from the // prefix and suffix. // Only the fields that could have actually changed need to be updated. pub fn insert_index_chunk(chunk_size: usize, multitenant: bool) -> &'static str { INSERT_INDEX_CHUNK_QUERIES .get(&(chunk_size, multitenant)) .unwrap() } static INSERT_OVERWRITE_INDEX_CHUNK_QUERIES: LazyLock<HashMap<(usize, bool), String>> = LazyLock::new(|| { smart_chunk_sizes() .flat_map(|chunk_size| { [false, true].into_iter().map(move |multitenant| { let query = if multitenant { let values = (1..=chunk_size) .map(|_| "(?, ?, ?, ?, ?, ?, ?, ?, ?)".to_string()) .join(", "); format!( r#"INSERT INTO @db_name.indexes (instance_name, index_id, ts, key_prefix, key_suffix, key_sha256, deleted, table_id, document_id) VALUES {values} ON DUPLICATE KEY UPDATE deleted = VALUES(deleted), table_id = VALUES(table_id), document_id = VALUES(document_id) "# ) } else { let values = (1..=chunk_size) .map(|_| "(?, ?, ?, ?, ?, ?, ?, ?)".to_string()) .join(", "); format!( r#"INSERT INTO @db_name.indexes (index_id, ts, key_prefix, key_suffix, key_sha256, deleted, table_id, document_id) VALUES {values} ON DUPLICATE KEY UPDATE deleted = VALUES(deleted), table_id = VALUES(table_id), document_id = VALUES(document_id) "# ) }; ((chunk_size, multitenant), query) }) }) .collect() }); pub fn insert_overwrite_index_chunk(chunk_size: usize, multitenant: bool) -> &'static str { INSERT_OVERWRITE_INDEX_CHUNK_QUERIES .get(&(chunk_size, multitenant)) .unwrap() } pub const DELETE_INDEX_COLUMN_COUNT: usize = 4; static DELETE_INDEX_CHUNK_QUERIES: LazyLock<HashMap<(usize, bool), String>> = LazyLock::new(|| { smart_chunk_sizes() .flat_map(|chunk_size| { [false, true].into_iter().map(move |multitenant| { let where_clauses = (1..=chunk_size) .map(|_| { if multitenant { "(index_id = ? AND key_prefix = ? AND key_sha256 = ? AND ts <= ? AND \ instance_name = ?)" } else { "(index_id = ? AND key_prefix = ? AND key_sha256 = ? AND ts <= ?)" } }) .join(" OR "); ( (chunk_size, multitenant), format!("DELETE FROM @db_name.indexes WHERE {where_clauses}"), ) }) }) .collect() }); pub fn delete_index_chunk(chunk_size: usize, multitenant: bool) -> &'static str { DELETE_INDEX_CHUNK_QUERIES .get(&(chunk_size, multitenant)) .unwrap() } pub const DELETE_DOCUMENT_COLUMN_COUNT: usize = 3; static DELETE_DOCUMENT_CHUNK_QUERIES: LazyLock<HashMap<(usize, bool), String>> = LazyLock::new(|| { smart_chunk_sizes() .flat_map(|chunk_size| { [false, true].into_iter().map(move |multitenant| { let where_clauses = (1..=chunk_size) .map(|_| { if multitenant { "(table_id = ? AND id = ? AND ts <= ? AND instance_name = ?)" } else { "(table_id = ? AND id = ? AND ts <= ?)" } }) .join(" OR "); ( (chunk_size, multitenant), // Note the use of "multi-table DELETE syntax" (`DELETE table // FROM table WHERE ...`) which MySQL requires for FORCE INDEX // syntax format!( "DELETE @db_name.documents FROM @db_name.documents FORCE INDEX \ (documents_by_table_and_id) WHERE {where_clauses}" ), ) }) }) .collect() }); pub fn delete_document_chunk(chunk_size: usize, multitenant: bool) -> &'static str { DELETE_DOCUMENT_CHUNK_QUERIES .get(&(chunk_size, multitenant)) .unwrap() } pub const fn write_persistence_global(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( r#"INSERT INTO @db_name.persistence_globals ({instance_col} `key`, json_value) VALUES ({instance_val} ?, ?) ON DUPLICATE KEY UPDATE json_value = VALUES(json_value) "#, instance_col = if multitenant { "instance_name," } else { "" }, instance_val = if multitenant { "?," } else { "" } ) ) } pub const fn get_persistence_global(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( r#"SELECT json_value FROM @db_name.persistence_globals FORCE INDEX (PRIMARY) WHERE `key` = ? {instance_clause}"#, instance_clause = if multitenant { "AND instance_name = ?" } else { "" } ) ) } // Maximum number of writes within a single transaction. This is the sum of // TRANSACTION_MAX_SYSTEM_NUM_WRITES and TRANSACTION_MAX_NUM_USER_WRITES. pub const MAX_INSERT_SIZE: usize = 56000; // Gross: after initialization, the first thing database does is insert metadata // documents. pub const fn check_newly_created(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( r#"SELECT 1 FROM @db_name.documents {instance_clause} LIMIT 1"#, instance_clause = if multitenant { "WHERE instance_name = ?" } else { "" } ) ) } // This table has no rows (not read_only) or 1 row (read_only), so if this query // returns any results, the persistence is read_only. pub const fn check_is_read_only(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( "SELECT 1 FROM @db_name.read_only {instance_clause} LIMIT 1", instance_clause = if multitenant { "WHERE instance_name = ?" } else { "" } ) ) } pub const fn set_read_only(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( "INSERT INTO @db_name.read_only ({read_only_col}) VALUES ({read_only_val})", read_only_col = if multitenant { "instance_name" } else { "id" }, read_only_val = if multitenant { "?" } else { "1" } ) ) } pub const fn unset_read_only(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( "DELETE FROM @db_name.read_only WHERE {read_only_col} = {read_only_val}", read_only_col = if multitenant { "instance_name" } else { "id" }, read_only_val = if multitenant { "?" } else { "1" } ) ) } // If this query returns a result, the lease is still valid and will remain so // until the end of the transaction. pub const fn lease_precond(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( "SELECT 1 FROM @db_name.leases FORCE INDEX (PRIMARY) WHERE ts=? AND {lease_cond} FOR \ SHARE", lease_cond = if multitenant { "instance_name = ?" } else { "id = 1" } ) ) } // Acquire the lease unless acquire by someone with a higher timestamp. pub const fn lease_acquire(multitenant: bool) -> &'static str { tableify!( multitenant, formatcp!( "UPDATE @db_name.leases SET ts=? WHERE ts<? AND {lease_cond}", lease_cond = if multitenant { "instance_name = ?" } else { "id = 1" } ) ) } // Pre-build queries with various parameters. // // Tricks that convince MySQL to choose good query plans: // 1. All queries are ordered by a prefix of columns in the primary key. If you // say `WHERE col1 = 'a' ORDER BY col2 ASC` it might not use the index, but // `WHERE col1 = 'a' ORDER BY col1 ASC, col2 ASC` which is completely // equivalent, does use the index. // 2. LEFT JOIN and FORCE INDEX FOR JOIN makes the join use the index for // lookups. Despite having all index columns with equality checks, MySQL will // do a hash join if you do an INNER JOIN or a plain FORCE INDEX. // 3. Tuple comparisons `(key_prefix, key_sha256) >= (?, ?)` are required for // Postgres to choose the correct query plan, but MySQL requires the other // format `(key_prefix > ? OR (key_prefix = ? AND key_sha256 >= ?))`. // // Note, we always paginate using (key_prefix, key_sha256), which doesn't // necessary give us the order we need for long keys that have key_suffix. static INDEX_QUERIES: LazyLock<[HashMap<(BoundType, BoundType, Order), String>; 2]> = LazyLock::new( || { let mut single_tenant = HashMap::new(); let mut multi_tenant = HashMap::new(); let bounds = [ BoundType::Unbounded, BoundType::Included, BoundType::Excluded, ]; let orders = [Order::Asc, Order::Desc]; let multitenant_options = [false, true]; for (lower, upper, order, multitenant) in iproduct!( bounds.iter(), bounds.iter(), orders.iter(), multitenant_options.iter() ) { // Build WHERE clause let mut where_clause = String::new(); if *multitenant { write!(where_clause, "instance_name = ? AND ").unwrap(); } write!(where_clause, "index_id = ? AND ts <= ?").unwrap(); // Add bound conditions match lower { BoundType::Unbounded => {}, BoundType::Included => { write!( where_clause, " AND (key_prefix > ? OR (key_prefix = ? AND key_sha256 >= ?))", ) .unwrap(); }, BoundType::Excluded => { write!( where_clause, " AND (key_prefix > ? OR (key_prefix = ? AND key_sha256 > ?))" ) .unwrap(); }, }; match upper { BoundType::Unbounded => {}, BoundType::Included => { write!( where_clause, " AND (key_prefix < ? OR (key_prefix = ? AND key_sha256 <= ?))" ) .unwrap(); }, BoundType::Excluded => { write!( where_clause, " AND (key_prefix < ? OR (key_prefix = ? AND key_sha256 < ?))" ) .unwrap(); }, }; let order_str = match order { Order::Asc => "ASC", Order::Desc => "DESC", }; // Build instance-specific clauses let ( select_instance_col, group_by_instance, join_instance_i1, join_instance_snapshot, doc_join_instance_cond, ) = if *multitenant { ( "I1.instance_name, ", "instance_name, ", "I1.instance_name, ", "snapshot.instance_name, ", "D.instance_name = I2.instance_name AND ", ) } else { ("", "", "", "", "") }; let query = format!( r#" SELECT I2.index_id, I2.key_prefix, I2.key_sha256, I2.key_suffix, I2.ts, I2.deleted, I2.document_id, D.table_id, D.json_value, D.prev_ts FROM ( SELECT {select_instance_col}I1.index_id, I1.key_prefix, I1.key_sha256, I1.key_suffix, I1.ts, I1.deleted, I1.table_id, I1.document_id FROM ( SELECT {group_by_instance}index_id, key_prefix, key_sha256, MAX(ts) as ts_at_snapshot FROM @db_name.indexes FORCE INDEX FOR GROUP BY (PRIMARY) WHERE {where_clause} GROUP BY {group_by_instance}index_id, key_prefix, key_sha256 ORDER BY index_id {order_str}, key_prefix {order_str}, key_sha256 {order_str} LIMIT ? ) snapshot LEFT JOIN @db_name.indexes I1 FORCE INDEX FOR JOIN (PRIMARY) ON ({join_instance_i1}I1.index_id, I1.key_prefix, I1.key_sha256, I1.ts) = ({join_instance_snapshot}snapshot.index_id, snapshot.key_prefix, snapshot.key_sha256, snapshot.ts_at_snapshot) ) I2 LEFT JOIN @db_name.documents D FORCE INDEX FOR JOIN (PRIMARY) ON {doc_join_instance_cond}D.ts = I2.ts AND D.table_id = I2.table_id AND D.id = I2.document_id -- Ensure deterministic final ordering across pages after the join ORDER BY I2.key_prefix {order_str}, I2.key_sha256 {order_str} "# ); if *multitenant { multi_tenant.insert((*lower, *upper, *order), query); } else { single_tenant.insert((*lower, *upper, *order), query); } } [single_tenant, multi_tenant] }, ); pub fn index_queries(multitenant: bool) -> &'static HashMap<(BoundType, BoundType, Order), String> { &INDEX_QUERIES[multitenant as usize] } // Multitenant variants of the index queries. Filters by instance_name and // ensures joins include instance_name so rows from other tenants cannot match. // (Removed) separate multitenant map; we now use INDEX_QUERIES[bool] with a // getter. // Parameter count for exact_rev_chunk queries: table_id, id, ts, // [instance_name] pub const EXACT_REV_CHUNK_PARAMS: usize = 3; static EXACT_REV_CHUNK_QUERIES: LazyLock<HashMap<(usize, bool), String>> = LazyLock::new(|| { smart_chunk_sizes() .flat_map(|chunk_size| { [false, true].into_iter().map(move |multitenant| { let where_clause = if multitenant { std::iter::repeat_n( "(table_id = ? AND id = ? AND ts = ? AND instance_name = ?)", chunk_size, ) .join(" OR ") } else { std::iter::repeat_n("(table_id = ? AND id = ? AND ts = ?)", chunk_size) .join(" OR ") }; ( (chunk_size, multitenant), format!( "SELECT id, ts, table_id, json_value, deleted, prev_ts FROM @db_name.documents FORCE INDEX (PRIMARY) WHERE {where_clause} ORDER BY ts ASC, table_id ASC, id ASC" ), ) }) }) .collect() }); pub fn exact_rev_chunk(chunk_size: usize, multitenant: bool) -> &'static str { EXACT_REV_CHUNK_QUERIES .get(&(chunk_size, multitenant)) .unwrap() } // Parameter count for prev_rev_chunk queries: query_ts, table_id, id, ts, // [instance_name] pub const PREV_REV_CHUNK_PARAMS: usize = 4; static PREV_REV_CHUNK_QUERIES: LazyLock<HashMap<(usize, bool), String>> = LazyLock::new(|| { smart_chunk_sizes() .flat_map(|chunk_size| { [false, true].into_iter().map(move |multitenant| { let select = if multitenant { r#" SELECT id, ts, table_id, json_value, deleted, prev_ts, ? as query_ts FROM @db_name.documents FORCE INDEX FOR ORDER BY (documents_by_table_and_id) WHERE table_id = ? AND id = ? and ts < ? AND instance_name = ? ORDER BY table_id DESC, id DESC, ts DESC LIMIT 1 "# } else { r#" SELECT id, ts, table_id, json_value, deleted, prev_ts, ? as query_ts FROM @db_name.documents FORCE INDEX FOR ORDER BY (documents_by_table_and_id) WHERE table_id = ? AND id = ? and ts < ? ORDER BY table_id DESC, id DESC, ts DESC LIMIT 1 "# }; let queries = iter::repeat_n(&format!("({select})"), chunk_size).join(" UNION ALL "); ((chunk_size, multitenant), queries) }) }) .collect() }); pub fn prev_rev_chunk(chunk_size: usize, multitenant: bool) -> &'static str { PREV_REV_CHUNK_QUERIES .get(&(chunk_size, multitenant)) .unwrap() } // TODO: This is now incorrect for multitenant databases. pub const TABLE_SIZE_QUERY: &str = " SELECT table_name, data_length, index_length, table_rows FROM information_schema.tables WHERE table_schema = ? "; pub const MIN_SHA256: [u8; 32] = [0; 32]; pub const MAX_SHA256: [u8; 32] = [255; 32]; // The key we use to paginate in SQL, note that we can't use key_suffix since // it is not part of the primary key. We use key_sha256 instead. #[derive(Clone)] pub struct SqlKey { pub prefix: Vec<u8>, pub sha256: Vec<u8>, } impl SqlKey { pub fn min_with_same_prefix(key: Vec<u8>) -> Self { let key = SplitKey::new(key); Self { prefix: key.prefix, sha256: MIN_SHA256.to_vec(), } } pub fn max_with_same_prefix(key: Vec<u8>) -> Self { let key = SplitKey::new(key); Self { prefix: key.prefix, sha256: MAX_SHA256.to_vec(), } } } // Translates a range to a SqlKey bounds we can use to get records in that // range. Note that because the SqlKey does not sort the same way as IndexKey // for very long keys, the returned range might contain extra keys that needs to // be filtered application side. pub fn to_sql_bounds(interval: Interval) -> (std::ops::Bound<SqlKey>, std::ops::Bound<SqlKey>) { use std::ops::Bound; let lower = match interval.start { StartIncluded(key) => { // This can potentially include more results than needed. Bound::Included(SqlKey::min_with_same_prefix(key.into())) }, }; let upper = match interval.end { End::Excluded(key) => { if key.len() < MAX_INDEX_KEY_PREFIX_LEN { Bound::Excluded(SqlKey::min_with_same_prefix(key.into())) } else { // We can't exclude the bound without potentially excluding other // keys that fall within the range. Bound::Included(SqlKey::max_with_same_prefix(key.into())) } }, End::Unbounded => Bound::Unbounded, }; (lower, upper) } pub fn index_query( index_id: IndexId, read_timestamp: Timestamp, lower: std::ops::Bound<SqlKey>, upper: std::ops::Bound<SqlKey>, order: Order, batch_size: usize, multitenant: bool, instance_name: &MySqlInstanceName, ) -> (&'static str, Vec<mysql_async::Value>) { use std::ops::Bound; let mut params = vec![]; fn internal_id_param(id: common::document::InternalId) -> Vec<u8> { id.into() } let mut map_bound = |b: Bound<SqlKey>| -> BoundType { match b { Bound::Unbounded => BoundType::Unbounded, Bound::Excluded(sql_key) => { params.push(sql_key.prefix.clone()); params.push(sql_key.prefix); params.push(sql_key.sha256); BoundType::Excluded }, Bound::Included(sql_key) => { params.push(sql_key.prefix.clone()); params.push(sql_key.prefix); params.push(sql_key.sha256); BoundType::Included }, } }; let lt = map_bound(lower); let ut = map_bound(upper); let query = index_queries(multitenant).get(&(lt, ut, order)).unwrap(); // Substitutions are {where_clause}, ts, {where_clause}, ts, limit. let mut all_params = vec![]; if multitenant { all_params.push((&instance_name.raw).into()); } all_params.push(internal_id_param(index_id).into()); all_params.push(i64::from(read_timestamp).into()); for param in params { all_params.push(param.into()); } all_params.push((batch_size as i64).into()); (query, all_params) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server