Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs24.5 kB
use std::{ collections::{ BTreeMap, BTreeSet, }, sync::Arc, time::Duration, }; use ::metrics::StatusTimer; use anyhow::Context; use common::{ backoff::Backoff, bootstrap_model::schema::SchemaState, errors::report_error, persistence::LatestDocument, runtime::Runtime, schemas::DatabaseSchema, types::{ IndexId, RepeatableTimestamp, }, virtual_system_mapping::VirtualSystemMapping, }; use database::{ Database, IndexModel, SchemaModel, SchemaValidationProgressModel, Snapshot, Transaction, SCHEMAS_TABLE, }; use errors::ErrorMetadataAnyhowExt; use futures::{ pin_mut, Future, TryStreamExt, }; use keybroker::Identity; use metrics::{ log_document_bytes, log_document_validated, schema_validation_timer, }; use value::{ NamespacedTableMapping, ResolvedDocumentId, TableName, TableNamespace, TabletId, }; use crate::metrics::log_worker_starting; mod metrics; const INITIAL_BACKOFF: Duration = Duration::from_millis(10); const MAX_BACKOFF: Duration = Duration::from_secs(5); const INITIAL_COMMIT_BACKOFF: Duration = Duration::from_millis(10); const MAX_COMMIT_BACKOFF: Duration = Duration::from_secs(2); const MAX_COMMIT_FAILURES: u32 = 3; pub struct SchemaWorker<RT: Runtime> { runtime: RT, database: Database<RT>, } pub struct PendingSchemaValidation { namespace: TableNamespace, id: ResolvedDocumentId, timer: StatusTimer, table_mapping: NamespacedTableMapping, virtual_system_mapping: VirtualSystemMapping, db_schema: Arc<DatabaseSchema>, ts: RepeatableTimestamp, active_schema: Option<Arc<DatabaseSchema>>, by_id_indexes: BTreeMap<TabletId, IndexId>, } impl<RT: Runtime> SchemaWorker<RT> { pub fn start(runtime: RT, database: Database<RT>) -> impl Future<Output = ()> + Send { let worker = Self { runtime, database }; async move { tracing::info!("Starting SchemaWorker"); let mut backoff = Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF); loop { if let Err(e) = worker.run().await { let delay = backoff.fail(&mut worker.runtime.rng()); report_error(&mut e.context("SchemaWorker died")).await; tracing::error!("Schema worker failed, sleeping {delay:?}"); worker.runtime.wait(delay).await; } else { backoff.reset(); } } } } pub(crate) async fn pending_schema_validations( tx: &mut Transaction<RT>, ) -> anyhow::Result<Vec<PendingSchemaValidation>> { let mut pending_schema_work = Vec::new(); let namespaces: Vec<_> = tx.table_mapping().namespaces_for_name(&SCHEMAS_TABLE); for namespace in namespaces { if let Some((id, db_schema)) = SchemaModel::new(tx, namespace) .get_by_state(SchemaState::Pending) .await? { tracing::debug!("SchemaWorker found a pending schema and is validating it..."); let timer = schema_validation_timer(); let table_mapping = tx.table_mapping().namespace(namespace); let virtual_system_mapping = tx.virtual_system_mapping().clone(); let active_schema = SchemaModel::new(tx, namespace) .get_by_state(SchemaState::Active) .await? .map(|(_id, active_schema)| active_schema); let ts = tx.begin_timestamp(); let by_id_indexes = IndexModel::new(tx).by_id_indexes().await?; pending_schema_work.push(PendingSchemaValidation { namespace, id, timer, table_mapping, virtual_system_mapping, db_schema, ts, active_schema, by_id_indexes, }); } } Ok(pending_schema_work) } pub async fn run(&self) -> anyhow::Result<()> { let status = log_worker_starting("SchemaWorker"); let mut tx: Transaction<RT> = self.database.begin(Identity::system()).await?; let snapshot = self.database.snapshot(tx.begin_timestamp())?; let pending_validations = SchemaWorker::pending_schema_validations(&mut tx).await?; let token = tx.into_token()?; for pending_validation in pending_validations { // FIXME: Remove clone let db_schema = pending_validation.db_schema.clone(); let tables_to_validate = DatabaseSchema::tables_to_validate( &db_schema, pending_validation.active_schema.as_deref(), &pending_validation.table_mapping, &pending_validation.virtual_system_mapping, &|table_name| { snapshot .table_summary(pending_validation.namespace, table_name) .map(|t| t.inferred_type().clone()) }, )?; self.validate_tables(tables_to_validate, pending_validation) .await?; } drop(status); tracing::debug!("SchemaWorker waiting..."); let subscription = self.database.subscribe(token).await?; subscription.wait_for_invalidation().await; Ok(()) } async fn validate_tables( &self, tables_to_validate: BTreeSet<&TableName>, PendingSchemaValidation { namespace, id, timer, table_mapping, virtual_system_mapping, db_schema, ts, active_schema: _, by_id_indexes, }: PendingSchemaValidation, ) -> anyhow::Result<()> { tracing::info!("SchemaWorker: Tables to check: {:?}", tables_to_validate); let mut schema_validation_progress_tracker = SchemaValidationProgressTracker::new( self.database.clone(), namespace, tables_to_validate.clone().into_iter().cloned().collect(), id, ) .await?; let tablet_ids = tables_to_validate .into_iter() .map(|table_name| table_mapping.name_to_tablet()(table_name.clone())) .collect::<Result<Vec<_>, _>>()?; let mut table_iterator = self .database .table_iterator(ts, 1000) .multi(tablet_ids.clone()); for tablet_id in tablet_ids { let stream = table_iterator.stream_documents_in_table( tablet_id, *by_id_indexes.get(&tablet_id).ok_or_else(|| { anyhow::anyhow!("Failed to find id index for table id {tablet_id}") })?, None, ); { pin_mut!(stream); let table_name = table_mapping.tablet_name(tablet_id)?; while let Some(LatestDocument { value: doc, .. }) = stream.try_next().await? { log_document_validated(); log_document_bytes(doc.size()); // If we finish with an error, we should delete progress. In all the // mark_failed, mark_success or whatever methods. if let Err(schema_error) = db_schema.check_existing_document( &doc, table_name.clone(), &table_mapping, &virtual_system_mapping, ) { let mut backoff = Backoff::new(INITIAL_COMMIT_BACKOFF, MAX_COMMIT_BACKOFF); while backoff.failures() < MAX_COMMIT_FAILURES { let mut tx = self.database.begin_system().await?; SchemaModel::new(&mut tx, namespace) .mark_failed(id, schema_error.clone()) .await?; if let Err(e) = self .database .commit_with_write_source(tx, "schema_worker_mark_failed") .await { if e.is_occ() { let delay = backoff.fail(&mut self.runtime.rng()); tracing::error!( "Schema worker failed to commit ({e}), retrying after \ {delay:?}" ); self.runtime.wait(delay).await; } else { return Err(e); } } else { break; } } tracing::info!("Schema is invalid"); timer.finish_developer_error(); return Ok(()); } // Update schema validation progress periodically, when we hit the // threshold. let progress_exists = schema_validation_progress_tracker .record_document_validated() .await?; // Return early if progress does not exist - this means the schema // validation has been canceled either by a document update that does // not match the pending schema or by the submission of a new pending // schema. if !progress_exists { return Ok(()); } } } table_iterator.unregister_table(tablet_id)?; } schema_validation_progress_tracker .record_validation_finished() .await?; let mut tx = self.database.begin(Identity::system()).await?; if let Err(error) = SchemaModel::new(&mut tx, namespace) .mark_validated(id) .await { if error.is_bad_request() { timer.finish_developer_error(); } tracing::info!("Schema not marked valid"); return Err(error); } self.database .commit_with_write_source(tx, "schema_worker_mark_valid") .await?; tracing::info!("Schema is valid"); timer.finish(); Ok(()) } } /// Tracks progress of schema validation for the tables that need to be /// validated, periodically writing progress to the /// `_schema_validation_progress` table for the given namespace and schema. struct SchemaValidationProgressTracker<RT: Runtime> { database: Database<RT>, namespace: TableNamespace, tables_to_validate: BTreeSet<TableName>, schema_id: ResolvedDocumentId, /// The threshold at which to write validation progress to the database. update_threshold: u64, /// The number of documents that have been validated since writing progress /// to the database. docs_validated: u64, } impl<RT: Runtime> SchemaValidationProgressTracker<RT> { pub async fn new( database: Database<RT>, namespace: TableNamespace, tables_to_validate: BTreeSet<TableName>, schema_id: ResolvedDocumentId, ) -> anyhow::Result<Self> { let mut tx = database.begin(Identity::system()).await?; let snapshot = database.snapshot(tx.begin_timestamp())?; let total_docs = Self::_total_docs(&snapshot, &tables_to_validate, namespace)?; let mut model = SchemaValidationProgressModel::new(&mut tx, namespace); model .initialize_schema_validation_progress(schema_id, total_docs) .await?; database .commit_with_write_source(tx, "schema_validation_tracker_initialized") .await?; // Update schema validation progress every 5% or 500 documents, whichever is // lower, to slowing down schema validation with too many writes. let update_threshold = total_docs .map(|total| std::cmp::min(500, (total as f64 * 0.05).ceil() as u64)) .unwrap_or(500); Ok(Self { database, namespace, tables_to_validate, schema_id, update_threshold, docs_validated: 0, }) } fn total_docs_at_ts(&self, ts: RepeatableTimestamp) -> anyhow::Result<Option<u64>> { let snapshot = self.database.snapshot(ts)?; Self::_total_docs(&snapshot, &self.tables_to_validate, self.namespace) } fn _total_docs( snapshot: &Snapshot, tables_to_validate: &BTreeSet<TableName>, namespace: TableNamespace, ) -> anyhow::Result<Option<u64>> { let total_docs = if snapshot.table_summaries.is_some() { let doc_counts = tables_to_validate .iter() .map(|table_name| { anyhow::Ok( snapshot .table_summary(namespace, table_name) .context( "Failed to retrieve table summary when table summaries were \ present", )? .num_values(), ) }) .try_collect::<Vec<_>>()?; Some(doc_counts.iter().sum()) } else { None }; Ok(total_docs) } /// Records that a document has been validated, writing to the db iff if we /// have hit the update threshold, otherwise tracking progress in memory. async fn record_document_validated(&mut self) -> anyhow::Result<bool> { self.docs_validated += 1; if self.docs_validated % self.update_threshold != 0 { return Ok(true); } tracing::debug!( "Updating schema validation progress with docs_validated: {}, update threshold: {}", self.docs_validated, self.update_threshold ); let mut tx = self.database.begin_system().await?; let total_docs = self.total_docs_at_ts(tx.begin_timestamp())?; let mut model = SchemaValidationProgressModel::new(&mut tx, self.namespace); let progress_exists = model .update_schema_validation_progress(self.schema_id, self.docs_validated, total_docs) .await?; self.database .commit_with_write_source(tx, "schema_validation_progress_updated") .await?; self.docs_validated = 0; Ok(progress_exists) } /// Flushes the remaining schema validation progress to the database after /// schema validation is finished. async fn record_validation_finished(self) -> anyhow::Result<()> { tracing::debug!( "Finalizing schema validation progress with docs_validated: {}", self.docs_validated ); let mut tx = self.database.begin_system().await?; let total_docs = self.total_docs_at_ts(tx.begin_timestamp())?; let mut model = SchemaValidationProgressModel::new(&mut tx, self.namespace); model .update_schema_validation_progress(self.schema_id, self.docs_validated, total_docs) .await?; self.database .commit_with_write_source(tx, "schema_validation_progress_finished") .await?; Ok(()) } } #[cfg(test)] mod tests { use common::{ assert_obj, bootstrap_model::schema::{ SchemaMetadata, SchemaState, }, db_schema, object_validator, schemas::{ validator::{ FieldValidator, Validator, }, DocumentSchema, }, }; use database::{ test_helpers::new_test_database, SchemaModel, SchemaValidationProgressModel, UserFacingModel, }; use keybroker::Identity; use maplit::btreeset; use runtime::testing::TestRuntime; use value::{ TableName, TableNamespace, }; use super::SchemaWorker; #[convex_macro::test_runtime] async fn test_schema_validation(rt: TestRuntime) -> anyhow::Result<()> { let db = new_test_database(rt.clone()).await; let schema_worker = SchemaWorker { runtime: rt.clone(), database: db.clone(), }; let mut tx = db.begin_system().await?; let table_name = "table".parse::<TableName>()?; let db_schema = db_schema!(table_name => DocumentSchema::Any); let (id, _) = SchemaModel::new_root_for_test(&mut tx) .submit_pending(db_schema) .await?; // Insert a document that matches the schema UserFacingModel::new_root_for_test(&mut tx) .insert(table_name.clone(), assert_obj!()) .await?; db.commit(tx).await?; // Check that the schema passes and is validated schema_worker.run().await?; let mut tx = db.begin(Identity::system()).await?; let doc = tx.get(id).await?.unwrap(); let schema: SchemaMetadata = doc.into_value().into_value().try_into()?; assert_eq!(schema.state, SchemaState::Validated); // Check that schema validation progress is written let mut model = SchemaValidationProgressModel::new(&mut tx, TableNamespace::test_user()); let progress = model .existing_schema_validation_progress(id) .await? .unwrap(); assert_eq!(progress.num_docs_validated, 0); // Doesn't need to validate any documents because the schema matches all // documents assert_eq!(progress.total_docs, Some(0)); // Insert a new schema that doesn't match the documents. It should fail! let db_schema = db_schema!(table_name => DocumentSchema::Union(vec![object_validator!("field" => FieldValidator::required_field_type(Validator::Int64))]), ); let (bad_schema_id, state) = SchemaModel::new_root_for_test(&mut tx) .submit_pending(db_schema) .await?; assert_eq!(state, SchemaState::Pending); db.commit(tx).await?; schema_worker.run().await?; let mut tx = db.begin(Identity::system()).await?; let doc = tx.get(id).await?.unwrap(); let schema: SchemaMetadata = doc.into_value().into_value().try_into()?; assert_eq!(schema.state, SchemaState::Overwritten); let doc = tx.get(bad_schema_id).await?.unwrap(); let schema: SchemaMetadata = doc.into_value().into_value().try_into()?; assert!(matches!(schema.state, SchemaState::Failed { .. })); // Progress should be deleted when schema is marked as failed. let mut model = SchemaValidationProgressModel::new(&mut tx, TableNamespace::test_user()); let progress = model.existing_schema_validation_progress(id).await?; assert!(progress.is_none()); Ok(()) } #[convex_macro::test_runtime] async fn test_schema_validation_progress_deleted_when_schema_marked_failed( rt: TestRuntime, ) -> anyhow::Result<()> { let db = new_test_database(rt.clone()).await; let schema_worker = SchemaWorker { runtime: rt.clone(), database: db.clone(), }; let mut tx = db.begin_system().await?; let table_name = "table".parse::<TableName>()?; let db_schema = db_schema!(table_name => DocumentSchema::Union(vec![])); let (id, _) = SchemaModel::new_root_for_test(&mut tx) .submit_pending(db_schema) .await?; db.commit(tx).await?; schema_worker.run().await?; // Check that schema validation progress is written let mut tx = db.begin_system().await?; let mut model = SchemaValidationProgressModel::new(&mut tx, TableNamespace::test_user()); let progress = model .existing_schema_validation_progress(id) .await? .unwrap(); assert_eq!(progress.num_docs_validated, 0); // Doesn't need to validate any documents because the schema matches all // documents assert_eq!(progress.total_docs, Some(0)); // Insert a document that does not match the schema UserFacingModel::new_root_for_test(&mut tx) .insert(table_name.clone(), assert_obj!()) .await?; let mut model = SchemaValidationProgressModel::new(&mut tx, TableNamespace::test_user()); let progress = model.existing_schema_validation_progress(id).await?; assert!(progress.is_none()); Ok(()) } #[convex_macro::test_runtime] async fn test_schema_validation_progress_deleted_when_schema_marked_active( rt: TestRuntime, ) -> anyhow::Result<()> { let db = new_test_database(rt.clone()).await; let schema_worker = SchemaWorker { runtime: rt.clone(), database: db.clone(), }; let mut tx = db.begin_system().await?; let table_name = "table".parse::<TableName>()?; let db_schema = db_schema!(table_name => DocumentSchema::Any); let (id, _) = SchemaModel::new_root_for_test(&mut tx) .submit_pending(db_schema) .await?; db.commit(tx).await?; schema_worker.run().await?; // Check that schema validation progress is written let mut tx = db.begin_system().await?; let mut model = SchemaValidationProgressModel::new(&mut tx, TableNamespace::test_user()); let progress = model .existing_schema_validation_progress(id) .await? .unwrap(); assert_eq!(progress.num_docs_validated, 0); // Doesn't need to validate any documents because the schema matches all // documents assert_eq!(progress.total_docs, Some(0)); // Marking a schema as active deletes the schema validation progress let mut model = SchemaModel::new_root_for_test(&mut tx); model.mark_active(id).await?; let mut model = SchemaValidationProgressModel::new(&mut tx, TableNamespace::test_user()); let progress = model.existing_schema_validation_progress(id).await?; assert!(progress.is_none()); Ok(()) } #[convex_macro::test_runtime] async fn test_schema_validation_progress_count(rt: TestRuntime) -> anyhow::Result<()> { let db = new_test_database(rt.clone()).await; let schema_worker = SchemaWorker { runtime: rt.clone(), database: db.clone(), }; let mut tx = db.begin_system().await?; let table_name = "table".parse::<TableName>()?; let db_schema = db_schema!(table_name => DocumentSchema::Any); let (id, _) = SchemaModel::new_root_for_test(&mut tx) .submit_pending(db_schema) .await?; let mut model = UserFacingModel::new_root_for_test(&mut tx); // Insert 21 documents to activate the update_threshold (so updates are not // written with each new document, but every 2 documents) let total_docs = 21; for _ in 0..total_docs { model.insert(table_name.clone(), assert_obj!()).await?; } db.commit(tx).await?; let mut tx = db.begin_system().await?; let pending_validation = SchemaWorker::pending_schema_validations(&mut tx) .await? .pop() .unwrap(); schema_worker .validate_tables(btreeset! { &table_name}, pending_validation) .await?; // Make sure the number of documents validated matches the total number of // documents let mut tx = db.begin_system().await?; let mut model = SchemaValidationProgressModel::new(&mut tx, TableNamespace::test_user()); let progress = model .existing_schema_validation_progress(id) .await? .unwrap(); assert_eq!(progress.num_docs_validated, total_docs); assert_eq!(progress.total_docs, Some(total_docs)); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server