Skip to main content
Glama

Convex MCP server

Official
by get-convex
worker.rs14.1 kB
use std::{ sync::Arc, time::Duration, }; use anyhow::Context; use common::{ self, backoff::Backoff, components::ComponentPath, document::{ ParseDocument, ParsedDocument, }, errors::report_error, execution_context::ExecutionId, runtime::Runtime, types::UdfIdentifier, RequestId, }; use database::{ Database, SystemMetadataModel, }; use exports::{ interface::ExportProvider, ExportComponents, }; use futures::{ Future, FutureExt, }; use keybroker::Identity; use model::exports::{ types::{ Export, ExportRequestor, }, ExportsModel, }; use storage::Storage; use usage_tracking::{ CallType, FunctionUsageTracker, StorageCallTracker, UsageCounter, }; use value::ResolvedDocumentId; use crate::{ exports::metrics::log_export_failed, metrics::log_worker_starting, }; const INITIAL_BACKOFF: Duration = Duration::from_secs(1); const MAX_BACKOFF: Duration = Duration::from_secs(900); // 15 minutes #[derive(thiserror::Error, Debug)] #[error("Export canceled")] struct ExportCanceled; pub struct ExportWorker<RT: Runtime> { pub(super) runtime: RT, pub(super) database: Database<RT>, pub(super) storage: Arc<dyn Storage>, pub(super) file_storage: Arc<dyn Storage>, pub(super) export_provider: Arc<dyn ExportProvider<RT>>, pub(super) backoff: Backoff, pub(super) usage_tracking: UsageCounter, pub(super) instance_name: String, } impl<RT: Runtime> ExportWorker<RT> { #[allow(clippy::new_ret_no_self)] pub fn new( runtime: RT, database: Database<RT>, storage: Arc<dyn Storage>, file_storage: Arc<dyn Storage>, export_provider: Arc<dyn ExportProvider<RT>>, usage_tracking: UsageCounter, instance_name: String, ) -> impl Future<Output = ()> + Send { let mut worker = Self { runtime, database, storage, file_storage, export_provider, backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF), usage_tracking, instance_name, }; async move { loop { if let Err(e) = worker.run().await { report_error(&mut e.context("ExportWorker died")).await; let delay = worker.backoff.fail(&mut worker.runtime.rng()); worker.runtime.wait(delay).await; } else { worker.backoff.reset(); } } } } // Subscribe to the export table. If there is a requested export, start // an export and mark as in_progress. If there's an export job that didn't // finish (it's in_progress), restart that export. pub async fn run(&mut self) -> anyhow::Result<()> { let mut tx = self.database.begin(Identity::system()).await?; let mut exports_model = ExportsModel::new(&mut tx); let export_requested = exports_model.latest_requested().await?; let export_in_progress = exports_model.latest_in_progress().await?; match (export_requested, export_in_progress) { (Some(_), Some(_)) => { anyhow::bail!("Can only have one export requested or in progress at once.") }, (Some(export), None) => { tracing::info!("Export requested."); let _status = log_worker_starting("ExportWorker"); let ts = self.database.now_ts_for_reads(); let in_progress_export = (*export).clone().in_progress(*ts)?; let in_progress_export_doc = SystemMetadataModel::new_global(&mut tx) .replace( export.id().to_owned(), in_progress_export.clone().try_into()?, ) .await? .parse()?; self.database .commit_with_write_source(tx, "export_worker_export_requested") .await?; self.export(in_progress_export_doc).await?; return Ok(()); }, (None, Some(export)) => { tracing::info!("In progress export restarting..."); let _status = log_worker_starting("ExportWorker"); self.export(export).await?; return Ok(()); }, (None, None) => { tracing::info!("No exports requested or in progress."); }, } let token = tx.into_token()?; let subscription = self.database.subscribe(token).await?; subscription.wait_for_invalidation().await; Ok(()) } async fn export(&mut self, export: ParsedDocument<Export>) -> anyhow::Result<()> { loop { match self.export_and_mark_complete(export.clone()).await { Ok(()) => { return Ok(()); }, Err(mut e) => { if e.is::<ExportCanceled>() { tracing::info!("Export {} canceled", export.id()); return Ok(()); } log_export_failed(&e); report_error(&mut e).await; let delay = self.backoff.fail(&mut self.runtime.rng()); tracing::error!("Export failed, retrying in {delay:?}"); self.runtime.wait(delay).await; }, } } } async fn export_and_mark_complete( &mut self, export: ParsedDocument<Export>, ) -> anyhow::Result<()> { let id = export.id(); let Export::InProgress { format, requestor, resumption_token, .. } = export.into_value() else { anyhow::bail!( "export_and_mark_complete should only be called with an InProgress export" ); }; // Drop the rest of `export` to prevent accidentally using stale state let database_snapshot = self.database.latest_database_snapshot()?; let snapshot_ts = *database_snapshot.timestamp(); let components = ExportComponents { runtime: self.runtime.clone(), database: database_snapshot, storage: self.storage.clone(), file_storage: self.file_storage.clone(), instance_name: self.instance_name.clone(), }; async fn modify_export<RT: Runtime>( database: &Database<RT>, id: ResolvedDocumentId, what: &'static str, f: impl FnOnce(Export) -> anyhow::Result<Export> + Send + Clone, ) -> anyhow::Result<()> { database .execute_with_occ_retries( Identity::system(), FunctionUsageTracker::new(), what, move |tx| { let f = f.clone(); async move { let export: ParsedDocument<Export> = tx.get(id).await?.context(ExportCanceled)?.parse()?; let export = export.into_value(); if let Export::Canceled { .. } = export { anyhow::bail!(ExportCanceled); } SystemMetadataModel::new_global(tx) .replace(id, f(export)?.try_into()?) .await?; Ok(()) } .boxed() .into() }, ) .await?; Ok(()) } let update_progress = |msg| { let database_ = self.database.clone(); async move { tracing::info!("Export {id} progress: {msg}"); modify_export(&database_, id, "export_worker_update_progress", |export| { export.update_progress(msg) }) .await?; Ok(()) } .boxed() }; let save_resumption_token = |token| { let database_ = self.database.clone(); async move { modify_export( &database_, id, "export_worker_save_resumption_token", |export| export.update_resumption_token(token), ) .await?; Ok(()) } .boxed() }; let (object_key, usage) = { let export_future = async { if let Some(token) = resumption_token { tracing::info!(?token, "Export {id} resuming..."); match self .export_provider .resume_export(self.instance_name.clone(), token, id, &update_progress) .await { Ok(Some(result)) => return Ok(result), Ok(None) => {}, Err(mut err) => { report_error(&mut err).await; }, } tracing::warn!("Export failed to resume"); // If we couldn't resume, just start a new export. We don't // bother deleting the resumption token here - just assume // it'll get rewritten soon. } tracing::info!(%snapshot_ts, "Export {id} beginning..."); self.export_provider .export( &components, format, requestor, id, &update_progress, &save_resumption_token, ) .await }; tokio::pin!(export_future); let database_ = self.database.clone(); // In parallel, monitor the export document to check for cancellation let monitor_export = async move { loop { let mut tx = database_.begin_system().await?; let Some(export) = tx.get(id).await? else { tracing::warn!("Export {id} disappeared"); return Err(ExportCanceled.into()); }; let export: ParsedDocument<Export> = export.parse()?; match *export { Export::InProgress { .. } => (), Export::Canceled { .. } => return Err(ExportCanceled.into()), Export::Requested { .. } | Export::Failed { .. } | Export::Completed { .. } => { anyhow::bail!("Export {id} is in unexpected state: {export:?}"); }, } let token = tx.into_token()?; let subscription = database_.subscribe(token).await?; subscription.wait_for_invalidation().await; } }; tokio::pin!(monitor_export); futures::future::select(export_future, monitor_export) .await .factor_first() .0? }; // Export is done; mark it as such. tracing::info!("Export {id} completed"); self.database .execute_with_occ_retries( Identity::system(), FunctionUsageTracker::new(), "export_worker_mark_complete", |tx| { let object_key = object_key.clone(); async move { let Some(export) = tx.get(id).await? else { tracing::warn!("Export {id} disappeared"); return Err(ExportCanceled.into()); }; let export: ParsedDocument<Export> = export.parse()?; if let Export::Canceled { .. } = *export { return Err(ExportCanceled.into()); } let completed_export = export.into_value().completed( snapshot_ts, *tx.begin_timestamp(), object_key, )?; SystemMetadataModel::new_global(tx) .replace(id, completed_export.try_into()?) .await?; Ok(()) } .boxed() .into() }, ) .await?; let object_attributes = self .storage .get_object_attributes(&object_key) .await? .context("error getting export object attributes from S3")?; let tag = requestor.usage_tag().to_string(); let call_type = match requestor { ExportRequestor::SnapshotExport => CallType::Export, ExportRequestor::CloudBackup => CallType::CloudBackup, }; // Charge file bandwidth for the upload of the snapshot to exports storage usage .track_storage_ingress_size(ComponentPath::root(), tag.clone(), object_attributes.size) .await; // Charge database bandwidth accumulated during the export self.usage_tracking .track_call( UdfIdentifier::SystemJob(tag), ExecutionId::new(), RequestId::new(), call_type, true, usage.gather_user_stats(), ) .await; Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server