Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs20.4 kB
use std::{ sync::Arc, time::Duration, }; use common::{ bootstrap_model::tables::{ TableMetadata, TableState, TABLES_TABLE, }, components::ComponentId, document::{ CreationTime, ParseDocument, ParsedDocument, CREATION_TIME_FIELD_PATH, ID_FIELD_PATH, }, errors::report_error, knobs::{ MAX_EXPIRED_SNAPSHOT_AGE, MAX_IMPORT_AGE, MAX_SESSION_CLEANUP_DURATION, SESSION_CLEANUP_DELETE_CONCURRENCY, SYSTEM_TABLE_CLEANUP_CHUNK_SIZE, SYSTEM_TABLE_CLEANUP_FREQUENCY, SYSTEM_TABLE_ROWS_PER_SECOND, }, query::{ Expression, IndexRange, IndexRangeExpression, Order, Query, }, runtime::{ new_rate_limiter, RateLimiter, Runtime, }, types::{ IndexName, TableName, }, }; use database::{ query::PaginationOptions, BootstrapComponentsModel, Database, ResolvedQuery, SystemMetadataModel, TableModel, }; use futures::{ Future, StreamExt, TryStreamExt, }; use governor::Quota; use keybroker::Identity; use metrics::{ log_exports_s3_cleanup, log_system_table_cleanup_rows, log_system_table_cursor_lag, system_table_cleanup_timer, }; use model::{ exports::ExportsModel, session_requests::SESSION_REQUESTS_TABLE, }; use rand::Rng; use storage::Storage; use tokio_stream::wrappers::ReceiverStream; use value::{ ConvexValue, ResolvedDocumentId, TableNamespace, TabletId, }; mod metrics; static MAX_ORPHANED_TABLE_NAMESPACE_AGE: Duration = Duration::from_days(2); pub struct SystemTableCleanupWorker<RT: Runtime> { database: Database<RT>, runtime: RT, exports_storage: Arc<dyn Storage>, } impl<RT: Runtime> SystemTableCleanupWorker<RT> { #[allow(clippy::new_ret_no_self)] pub fn new( runtime: RT, database: Database<RT>, exports_storage: Arc<dyn Storage>, ) -> impl Future<Output = ()> + Send { let mut worker = SystemTableCleanupWorker { database, runtime, exports_storage, }; async move { if MAX_SESSION_CLEANUP_DURATION.is_none() { tracing::error!( "Forcibly disabling system table cleanup, exiting SystemTableCleanupWorker..." ); return; } loop { if let Err(e) = worker.run().await { report_error(&mut e.context("SystemTableCleanupWorker died")).await; } } } } async fn run(&mut self) -> anyhow::Result<()> { tracing::info!("Starting SystemTableCleanupWorker"); let rate_limiter = new_rate_limiter( self.runtime.clone(), Quota::per_second(*SYSTEM_TABLE_ROWS_PER_SECOND), ); let mut session_requests_delete_cursor = None; loop { // Jitter the wait between deletion runs to even out load. let delay = SYSTEM_TABLE_CLEANUP_FREQUENCY.mul_f32(self.runtime.rng().random()); self.runtime.wait(delay).await; self.cleanup_hidden_tables().await?; self.cleanup_orphaned_table_namespaces().await?; self.cleanup_expired_exports().await?; // _session_requests are used to make mutations idempotent. // We can delete them after they are old enough that the client that // created the mutation must be gone. let session_requests_cutoff = match *MAX_SESSION_CLEANUP_DURATION { Some(duration) => { Some((*self.database.now_ts_for_reads().sub(duration)?).try_into()?) }, None => None, }; // Preserve the deletion cursor between runs. This helps skip index tombstones. // Note that we only update the cursor after a successful run. (_, session_requests_delete_cursor) = self .cleanup_system_table( TableNamespace::Global, &SESSION_REQUESTS_TABLE, session_requests_cutoff .map_or(CreationTimeInterval::None, CreationTimeInterval::Before), &rate_limiter, *SESSION_CLEANUP_DELETE_CONCURRENCY, session_requests_delete_cursor, ) .await?; } } async fn cleanup_hidden_tables(&self) -> anyhow::Result<()> { let mut tx = self.database.begin(Identity::system()).await?; let mut num_deleted = 0; let query = Query::full_table_scan(TABLES_TABLE.clone(), Order::Asc); let mut query_stream = ResolvedQuery::new(&mut tx, TableNamespace::Global, query.clone())?; { while let Some(document) = query_stream.next(&mut tx, None).await? { // Limit rows read and rows deleted to avoid hitting transaction limits size. if query_stream.is_approaching_data_limit() || num_deleted > 1000 { let cursor = query_stream.cursor(); self.database .commit_with_write_source(tx, "system_table_cleanup") .await?; tracing::info!("Deleted {num_deleted} hidden tables"); num_deleted = 0; tx = self.database.begin(Identity::system()).await?; query_stream = ResolvedQuery::new_bounded( &mut tx, TableNamespace::Global, query.clone(), PaginationOptions::ManualPagination { start_cursor: cursor, maximum_rows_read: None, maximum_bytes_read: None, }, None, database::query::TableFilter::IncludePrivateSystemTables, )?; } let table: ParsedDocument<TableMetadata> = document.parse()?; match table.state { TableState::Active | TableState::Deleting => {}, TableState::Hidden => { let now = CreationTime::try_from(*self.database.now_ts_for_reads())?; let creation_time = table.creation_time(); let age = Duration::from_millis( (f64::from(now) - f64::from(creation_time)) as u64, ); // Mark as deleting if hidden for more than twice the max import age. if age > 2 * (*MAX_IMPORT_AGE) { let table_id = TabletId(table.id().internal_id()); tracing::info!("Deleting hidden table: {table_id:?}"); TableModel::new(&mut tx) .delete_hidden_table(table_id) .await?; num_deleted += 1; } }, }; } } if num_deleted > 0 { self.database .commit_with_write_source(tx, "system_table_cleanup") .await?; tracing::info!("Deleted {num_deleted} hidden tables"); } Ok(()) } /// Delete table namespaces that are not associated with any component. /// This can occur when a push does not complete successfully, where /// `start_push` initializes component system tables in a new namespace /// but `finish_push` never commits the component. async fn cleanup_orphaned_table_namespaces(&self) -> anyhow::Result<()> { let mut tx = self.database.begin(Identity::system()).await?; let ts = tx.begin_timestamp(); let table_mapping = tx.table_mapping().clone(); let component_paths = BootstrapComponentsModel::new(&mut tx).all_component_paths(); let mut table_model = TableModel::new(&mut tx); const MAX_TABLES_PER_RUN: usize = 1024; let mut deleted_tables = 0; 'cleanup: for (namespace, map) in table_mapping.iter_active_namespaces() { let component_id = ComponentId::from(*namespace); if component_paths.contains_key(&component_id) { continue; } for (table_name, tablet_id) in map.iter() { // Ensure user tables are empty before deleting. if !table_name.is_system() { let count = table_model.must_count(*namespace, table_name).await?; anyhow::ensure!( count == 0, "Non-system table {table_name} found with {count} documents in orphaned \ table namespace component id: {component_id:?}" ); } let table_metadata = table_model.get_table_metadata(*tablet_id).await?; let now = CreationTime::try_from(*ts)?; let creation_time = table_metadata.creation_time(); let age = Duration::from_millis((f64::from(now) - f64::from(creation_time)) as u64); if age > MAX_ORPHANED_TABLE_NAMESPACE_AGE { tracing::info!( "Deleting orphaned table {table_name:?} in non-existent component \ {component_id:?}" ); table_model .delete_active_table(*namespace, table_name.clone()) .await?; deleted_tables += 1; if deleted_tables >= MAX_TABLES_PER_RUN { // Don't create an overly large transaction; we'll get // to the remaining tables on the next run. tracing::warn!( "Hit the limit of {} tables, stopping early", MAX_TABLES_PER_RUN ); break 'cleanup; } } } } self.database .commit_with_write_source(tx, "system_table_cleanup") .await?; Ok(()) } async fn cleanup_system_table( &self, namespace: TableNamespace, table: &TableName, to_delete: CreationTimeInterval, rate_limiter: &RateLimiter<RT>, num_deleters: usize, mut cursor: Option<(CreationTime, ResolvedDocumentId)>, ) -> anyhow::Result<(usize, Option<(CreationTime, ResolvedDocumentId)>)> { let _timer = system_table_cleanup_timer(); let (tx, rx) = tokio::sync::mpsc::channel(1); let deleter = |chunk: Vec<ResolvedDocumentId>| async { let deleted_chunk = self .cleanup_system_table_delete_chunk(namespace, table, chunk) .await?; for _ in 0..deleted_chunk { // Don't rate limit within transactions, because that would just increase // contention. Rate limit between transactions to limit // overall deletion speed. while let Err(not_until) = rate_limiter.check() { let delay = not_until.wait_time_from(self.runtime.monotonic_now().into()); self.runtime.wait(delay).await; } } Ok(deleted_chunk) }; let deleters = ReceiverStream::new(rx) .map(deleter) .buffer_unordered(num_deleters) .try_fold(0, |acc, x| async move { Ok(acc + x) }); let reader = async move { loop { let deleted_chunk = self .cleanup_system_table_read_chunk(namespace, table, to_delete, &mut cursor) .await?; if deleted_chunk.is_empty() { return Ok::<_, anyhow::Error>(()); } tx.send(deleted_chunk).await?; } }; let ((), deleted) = futures::try_join!(reader, deleters)?; Ok((deleted, cursor)) } async fn cleanup_system_table_read_chunk( &self, namespace: TableNamespace, table: &TableName, to_delete: CreationTimeInterval, cursor: &mut Option<(CreationTime, ResolvedDocumentId)>, ) -> anyhow::Result<Vec<ResolvedDocumentId>> { let mut tx = self.database.begin(Identity::system()).await?; if !TableModel::new(&mut tx).table_exists(namespace, table) { return Ok(vec![]); } if matches!(to_delete, CreationTimeInterval::None) { return Ok(vec![]); } let mut range = match to_delete { CreationTimeInterval::None => return Ok(vec![]), CreationTimeInterval::All => vec![], CreationTimeInterval::Before(cutoff) => vec![IndexRangeExpression::Lt( CREATION_TIME_FIELD_PATH.clone(), f64::from(cutoff).into(), )], }; if let Some((creation_time, _id)) = cursor { // The semantics of the cursor mean that all documents <= cursor have been // deleted, but retention might not have run yet, so we skip over their // tombstones. range.push(IndexRangeExpression::Gte( CREATION_TIME_FIELD_PATH.clone(), f64::from(*creation_time).into(), )); } let mut index_scan = Query::index_range(IndexRange { index_name: IndexName::by_creation_time(table.clone()), range, order: Order::Asc, }); if let Some((creation_time, id)) = cursor { index_scan = index_scan.filter(Expression::Or(vec![ Expression::Neq( Box::new(Expression::Field(CREATION_TIME_FIELD_PATH.clone())), Box::new(Expression::Literal( ConvexValue::from(f64::from(*creation_time)).into(), )), ), Expression::Gt( Box::new(Expression::Field(ID_FIELD_PATH.clone())), Box::new(Expression::Literal(ConvexValue::from(*id).into())), ), ])); } index_scan = index_scan.limit(*SYSTEM_TABLE_CLEANUP_CHUNK_SIZE); let mut query = ResolvedQuery::new(&mut tx, namespace, index_scan)?; let mut docs = vec![]; while let Some(document) = query.next(&mut tx, None).await? { docs.push(document.id()); *cursor = Some((document.creation_time(), document.id())); } if let Some((creation_time, _id)) = cursor { log_system_table_cursor_lag(table, *creation_time); } Ok(docs) } async fn cleanup_system_table_delete_chunk( &self, namespace: TableNamespace, table: &TableName, docs: Vec<ResolvedDocumentId>, ) -> anyhow::Result<usize> { let mut tx = self.database.begin(Identity::system()).await?; let mut deleted_count = 0; for doc in docs { SystemMetadataModel::new(&mut tx, namespace) .delete(doc) .await?; deleted_count += 1; } if deleted_count == 0 { return Ok(0); } self.database .commit_with_write_source(tx, "system_table_cleanup") .await?; tracing::info!("deleted {deleted_count} documents from {table}"); log_system_table_cleanup_rows(table, deleted_count); Ok(deleted_count) } async fn cleanup_expired_exports(&self) -> anyhow::Result<()> { let mut tx = self.database.begin(Identity::system()).await?; let object_keys_to_del = ExportsModel::new(&mut tx) .cleanup_expired(*MAX_EXPIRED_SNAPSHOT_AGE) .await?; let num_deleted = object_keys_to_del.len(); for object_key in object_keys_to_del { self.exports_storage.delete_object(&object_key).await?; log_exports_s3_cleanup(); } self.database .commit_with_write_source(tx, "system_table_cleanup") .await?; if num_deleted > 0 { tracing::info!("Deleted {num_deleted} expired snapshots"); } Ok(()) } } #[derive(Clone, Copy, Debug)] enum CreationTimeInterval { #[allow(dead_code)] All, None, Before(CreationTime), } #[cfg(test)] mod tests { use std::{ num::NonZeroU32, sync::Arc, time::Duration, }; use common::{ document::CreationTime, identity::InertIdentity, runtime::{ new_rate_limiter, Runtime, }, }; use database::test_helpers::DbFixtures; use governor::Quota; use keybroker::Identity; use model::{ session_requests::{ types::{ SessionRequestOutcome, SessionRequestRecord, }, SessionRequestModel, SESSION_REQUESTS_TABLE, }, test_helpers::DbFixturesWithModel, }; use runtime::testing::TestRuntime; use storage::LocalDirStorage; use sync_types::SessionId; use value::{ ConvexValue, JsonPackedValue, TableNamespace, }; use crate::system_table_cleanup::{ CreationTimeInterval, SystemTableCleanupWorker, }; async fn test_system_table_cleanup_helper( rt: TestRuntime, num_deleters: usize, ) -> anyhow::Result<()> { let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; let exports_storage = Arc::new(LocalDirStorage::new(rt.clone())?); let worker = SystemTableCleanupWorker { database: db.clone(), runtime: rt.clone(), exports_storage: exports_storage.clone(), }; let mut creation_times = vec![]; for _ in 0..10 { let mut tx = db.begin_system().await?; SessionRequestModel::new(&mut tx) .record_session_request( SessionRequestRecord { session_id: SessionId::new(rt.new_uuid_v4()), request_id: 0, outcome: SessionRequestOutcome::Mutation { result: JsonPackedValue::pack(ConvexValue::Null), log_lines: vec![].into(), }, identity: InertIdentity::System, }, Identity::system(), ) .await?; creation_times.push(*tx.begin_timestamp()); db.commit(tx).await?; rt.advance_time(Duration::from_secs(1)).await; } let cutoff = CreationTime::try_from(creation_times[4])?; let rate_limiter = new_rate_limiter(rt.clone(), Quota::per_second(NonZeroU32::new(10).unwrap())); let (deleted, _cursor) = worker .cleanup_system_table( TableNamespace::Global, &SESSION_REQUESTS_TABLE, CreationTimeInterval::Before(cutoff), &rate_limiter, num_deleters, None, ) .await?; assert_eq!(deleted, 3); let count = db .begin_system() .await? .count(TableNamespace::Global, &SESSION_REQUESTS_TABLE) .await?; assert_eq!(count, Some(7)); Ok(()) } #[convex_macro::test_runtime] async fn test_system_table_cleanup_1(rt: TestRuntime) -> anyhow::Result<()> { test_system_table_cleanup_helper(rt, 1).await } #[convex_macro::test_runtime] async fn test_system_table_cleanup_2(rt: TestRuntime) -> anyhow::Result<()> { test_system_table_cleanup_helper(rt, 2).await } #[convex_macro::test_runtime] async fn test_system_table_cleanup_8(rt: TestRuntime) -> anyhow::Result<()> { test_system_table_cleanup_helper(rt, 8).await } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server