Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs14.7 kB
use std::sync::LazyLock; use anyhow::Context; use common::{ components::ComponentPath, document::{ ParseDocument, ParsedDocument, }, maybe_val, query::{ Expression, Order, Query, }, runtime::Runtime, types::FullyQualifiedObjectKey, }; use database::{ patch_value, ResolvedQuery, SystemMetadataModel, Transaction, }; use errors::ErrorMetadata; use sync_types::Timestamp; use types::ImportRequestor; use value::{ ConvexObject, ConvexValue, ResolvedDocumentId, TableName, TableNamespace, TabletId, }; use self::types::{ ImportFormat, ImportMode, ImportState, ImportTableCheckpoint, SnapshotImport, }; use crate::{ SystemIndex, SystemTable, }; pub mod types; pub static SNAPSHOT_IMPORTS_TABLE: LazyLock<TableName> = LazyLock::new(|| { "_snapshot_imports" .parse() .expect("Invalid built-in snapshot imports table") }); pub struct SnapshotImportsTable; impl SystemTable for SnapshotImportsTable { type Metadata = SnapshotImport; fn table_name() -> &'static TableName { &SNAPSHOT_IMPORTS_TABLE } fn indexes() -> Vec<SystemIndex<Self>> { vec![] } } pub struct SnapshotImportModel<'a, RT: Runtime> { tx: &'a mut Transaction<RT>, } impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> { pub fn new(tx: &'a mut Transaction<RT>) -> Self { Self { tx } } pub async fn get( &mut self, id: ResolvedDocumentId, ) -> anyhow::Result<Option<ParsedDocument<SnapshotImport>>> { anyhow::ensure!(self .tx .table_mapping() .namespace(TableNamespace::Global) .tablet_matches_name(id.tablet_id, SnapshotImportsTable::table_name())); match self.tx.get(id).await? { None => Ok(None), Some(doc) => Ok(Some(doc.parse()?)), } } pub async fn list(&mut self) -> anyhow::Result<Vec<ParsedDocument<SnapshotImport>>> { let value_query = Query::full_table_scan(SNAPSHOT_IMPORTS_TABLE.clone(), Order::Asc); let mut query_stream = ResolvedQuery::new(self.tx, TableNamespace::Global, value_query)?; let mut result = vec![]; while let Some(doc) = query_stream.next(self.tx, None).await? { let row: ParsedDocument<SnapshotImport> = doc.parse()?; result.push(row); } Ok(result) } pub async fn start_import( &mut self, format: ImportFormat, mode: ImportMode, component_path: ComponentPath, object_key: FullyQualifiedObjectKey, requestor: ImportRequestor, ) -> anyhow::Result<ResolvedDocumentId> { let snapshot_import = SnapshotImport { state: ImportState::Uploaded, format, mode, component_path, object_key: Ok(object_key), member_id: self.tx.identity().member_id(), checkpoints: None, requestor, }; let id = SystemMetadataModel::new_global(self.tx) .insert( SnapshotImportsTable::table_name(), snapshot_import.try_into()?, ) .await?; Ok(id) } pub async fn must_get_state(&mut self, id: ResolvedDocumentId) -> anyhow::Result<ImportState> { let state = self .get(id) .await? .context(ErrorMetadata::not_found( "ImportNotFound", format!("import {id} not found"), ))? .state .clone(); Ok(state) } async fn update_state( &mut self, id: ResolvedDocumentId, new_state: impl FnOnce(ImportState) -> ImportState, ) -> anyhow::Result<()> { let current_state = self.must_get_state(id).await?; let new_state = new_state(current_state.clone()); match (&current_state, &new_state) { (ImportState::Uploaded, ImportState::WaitingForConfirmation { .. }) | (ImportState::Uploaded, ImportState::Failed(..)) | (ImportState::WaitingForConfirmation { .. }, ImportState::InProgress { .. }) | (ImportState::WaitingForConfirmation { .. }, ImportState::Failed { .. }) | (ImportState::InProgress { .. }, ImportState::InProgress { .. }) | (ImportState::InProgress { .. }, ImportState::Completed { .. }) | (ImportState::InProgress { .. }, ImportState::Failed(..)) => {}, (..) => { anyhow::bail!("invalid import state transition {current_state:?} -> {new_state:?}") }, } SystemMetadataModel::new_global(self.tx) .patch( id, patch_value!("state" => Some(ConvexValue::Object(new_state.try_into()?)))?, ) .await?; Ok(()) } async fn update_checkpoints( &mut self, id: ResolvedDocumentId, update_checkpoints: impl FnOnce(&mut Vec<ImportTableCheckpoint>), ) -> anyhow::Result<()> { let mut import = self.get(id).await?.context(ErrorMetadata::not_found( "ImportNotFound", format!("import {id} not found"), ))?; let mut checkpoints = import.checkpoints.clone().unwrap_or_default(); update_checkpoints(&mut checkpoints); import.checkpoints = Some(checkpoints); SystemMetadataModel::new_global(self.tx) .replace(id, import.into_value().try_into()?) .await?; Ok(()) } pub async fn mark_waiting_for_confirmation( &mut self, id: ResolvedDocumentId, info_message: String, require_manual_confirmation: bool, new_checkpoints: Vec<ImportTableCheckpoint>, ) -> anyhow::Result<()> { self.update_state(id, move |_| ImportState::WaitingForConfirmation { info_message, require_manual_confirmation, }) .await?; self.update_checkpoints(id, move |checkpoints| { *checkpoints = new_checkpoints; }) .await } pub async fn confirm_import(&mut self, id: ResolvedDocumentId) -> anyhow::Result<()> { let current_state = self.must_get_state(id).await?; // No-op if the import is already in progress or finished since the CLI may // show a confirmation prompt when the import was confirmed in the dashboard. if matches!(current_state, ImportState::WaitingForConfirmation { .. }) { self.update_state(id, move |_| ImportState::InProgress { progress_message: "Importing".to_string(), checkpoint_messages: vec![], }) .await?; }; Ok(()) } pub async fn cancel_import(&mut self, id: ResolvedDocumentId) -> anyhow::Result<()> { let current_state = self.must_get_state(id).await?; match current_state { ImportState::Uploaded | ImportState::WaitingForConfirmation { .. } | ImportState::InProgress { .. } => { self.fail_import(id, "Import canceled".to_string()).await? }, ImportState::Completed { .. } => anyhow::bail!(ErrorMetadata::bad_request( "CannotCancelImport", "Cannot cancel an import that has completed" )), ImportState::Failed(_) => anyhow::bail!(ErrorMetadata::bad_request( "CannotCancelImport", "Cannot cancel an import that has failed" )), } Ok(()) } pub async fn complete_import( &mut self, id: ResolvedDocumentId, ts: Timestamp, num_rows_written: u64, ) -> anyhow::Result<()> { self.update_state(id, move |_| ImportState::Completed { ts, num_rows_written: num_rows_written as i64, }) .await } pub async fn fail_import( &mut self, id: ResolvedDocumentId, error_message: String, ) -> anyhow::Result<()> { self.update_state(id, move |_| ImportState::Failed(error_message)) .await } pub async fn checkpoint_tablet_created( &mut self, id: ResolvedDocumentId, component_path: &ComponentPath, table_name: &TableName, tablet_id: TabletId, ) -> anyhow::Result<()> { self.update_checkpoints(id, move |checkpoints| { if let Some(checkpoint) = checkpoints.iter_mut().find(|c| { c.component_path == *component_path && c.display_table_name == *table_name }) { checkpoint.tablet_id = Some(tablet_id); } }) .await } pub async fn get_table_checkpoint( &mut self, id: ResolvedDocumentId, component_path: &ComponentPath, display_table_name: &TableName, ) -> anyhow::Result<Option<ImportTableCheckpoint>> { let Some(import) = self.get(id).await? else { return Ok(None); }; let Some(checkpoints) = &import.checkpoints else { return Ok(None); }; Ok(checkpoints .iter() .find(|c| { c.component_path == *component_path && c.display_table_name == *display_table_name }) .cloned()) } pub async fn add_checkpoint_message( &mut self, id: ResolvedDocumentId, checkpoint_message: String, component_path: &ComponentPath, display_table_name: &TableName, num_rows_written: i64, ) -> anyhow::Result<()> { let mut noop = false; let noop_ = &mut noop; self.update_checkpoints(id, move |checkpoints| { if let Some(checkpoint) = checkpoints.iter_mut().find(|c| { c.component_path == *component_path && c.display_table_name == *display_table_name }) { if num_rows_written <= checkpoint.num_rows_written { *noop_ = true; return; } checkpoint.num_rows_written = num_rows_written; } }) .await?; self.update_state(id, move |state| { let (progress_message, mut checkpoint_messages) = match state { ImportState::InProgress { progress_message, checkpoint_messages, } => (progress_message, checkpoint_messages), _ => ("Importing".to_string(), vec![]), }; if !checkpoint_messages.contains(&checkpoint_message) { checkpoint_messages.push(checkpoint_message.clone()); } let progress_message = if noop { progress_message } else { checkpoint_message }; ImportState::InProgress { progress_message, checkpoint_messages, } }) .await } pub async fn update_progress_message( &mut self, id: ResolvedDocumentId, progress_message: String, component_path: &ComponentPath, display_table_name: &TableName, num_rows_written: i64, ) -> anyhow::Result<()> { let mut noop = false; let noop_ = &mut noop; self.update_checkpoints(id, move |checkpoints| { if let Some(checkpoint) = checkpoints.iter_mut().find(|c| { c.component_path == *component_path && c.display_table_name == *display_table_name }) { if checkpoint.num_rows_written > 0 && num_rows_written <= checkpoint.num_rows_written { *noop_ = true; return; } checkpoint.num_rows_written = num_rows_written; } }) .await?; if noop { return Ok(()); } self.update_state(id, move |state| { let checkpoint_messages = match state { ImportState::InProgress { progress_message: _, checkpoint_messages, } => checkpoint_messages, _ => vec![], }; ImportState::InProgress { progress_message, checkpoint_messages, } }) .await } pub async fn import_in_state( &mut self, import_state: ImportState, ) -> anyhow::Result<Option<ParsedDocument<SnapshotImport>>> { let import_state_type = ConvexObject::try_from(import_state)? .get("state") .context("should have state field")? .clone(); let query = common::query::Query::full_table_scan(SNAPSHOT_IMPORTS_TABLE.clone(), Order::Asc) .filter(Expression::Eq( // TODO(lee) change to use an index. Box::new(Expression::Field("state.state".parse()?)), Box::new(Expression::Literal(maybe_val!(import_state_type))), )); let mut query_stream = ResolvedQuery::new(self.tx, TableNamespace::Global, query)?; query_stream .next(self.tx, Some(1)) .await? .map(|doc| doc.parse()) .transpose() } } #[cfg(test)] mod tests { use anyhow::Context; use common::components::ComponentPath; use database::test_helpers::DbFixtures; use runtime::testing::TestRuntime; use super::types::ImportRequestor; use crate::{ snapshot_imports::{ types::{ ImportFormat, ImportMode, }, SnapshotImportModel, }, test_helpers::DbFixturesWithModel, }; #[convex_macro::test_runtime] async fn test_start_get_list(rt: TestRuntime) -> anyhow::Result<()> { let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; let mut tx = db.begin_system().await?; let mut imports_model = SnapshotImportModel::new(&mut tx); let id = imports_model .start_import( ImportFormat::Zip, ImportMode::Replace, ComponentPath::root(), "objectkey".to_string().into(), ImportRequestor::SnapshotImport, ) .await?; let doc = imports_model.get(id).await?.context("Doc missing?")?; assert_eq!(imports_model.list().await?, vec![doc]); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server