Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs18.7 kB
use std::{ collections::BTreeMap, sync::{ Arc, LazyLock, }, }; use common::{ components::CanonicalizedComponentFunctionPath, document::{ ParseDocument, ParsedDocument, }, execution_context::ExecutionContext, knobs::{ TRANSACTION_MAX_NUM_SCHEDULED, TRANSACTION_MAX_SCHEDULED_TOTAL_ARGUMENT_SIZE_BYTES, }, maybe_val, query::{ Expression, IndexRange, IndexRangeExpression, Order, Query, }, runtime::{ Runtime, UnixTimestamp, }, types::{ GenericIndexName, IndexName, }, virtual_system_mapping::VirtualSystemDocMapper, }; use database::{ unauthorized_error, ResolvedQuery, SystemMetadataModel, Transaction, }; use errors::ErrorMetadata; use maplit::btreemap; use sync_types::Timestamp; use value::{ id_v6::DeveloperDocumentId, ConvexArray, ConvexValue, FieldPath, ResolvedDocumentId, Size, TableName, TableNamespace, }; use self::{ types::{ ScheduledJob, ScheduledJobAttempts, ScheduledJobState, }, virtual_table::ScheduledJobsDocMapper, }; use crate::{ SystemIndex, SystemTable, }; pub mod types; pub mod virtual_table; pub static SCHEDULED_JOBS_TABLE: LazyLock<TableName> = LazyLock::new(|| { "_scheduled_jobs" .parse() .expect("_scheduled_jobs is not a valid system table name") }); pub static SCHEDULED_JOBS_VIRTUAL_TABLE: LazyLock<TableName> = LazyLock::new(|| { "_scheduled_functions" .parse() .expect("_scheduled_functions is not a valid virtual table name") }); static SCHEDULED_JOBS_INDEX_BY_ID: LazyLock<IndexName> = LazyLock::new(|| GenericIndexName::by_id(SCHEDULED_JOBS_TABLE.clone())); static SCHEDULED_JOBS_INDEX_BY_CREATION_TIME: LazyLock<IndexName> = LazyLock::new(|| GenericIndexName::by_creation_time(SCHEDULED_JOBS_TABLE.clone())); static SCHEDULED_JOBS_VIRTUAL_INDEX_BY_ID: LazyLock<IndexName> = LazyLock::new(|| GenericIndexName::by_id(SCHEDULED_JOBS_VIRTUAL_TABLE.clone())); static SCHEDULED_JOBS_VIRTUAL_INDEX_BY_CREATION_TIME: LazyLock<IndexName> = LazyLock::new(|| GenericIndexName::by_creation_time(SCHEDULED_JOBS_VIRTUAL_TABLE.clone())); /// By next ts. Used to efficiently find next jobs to execute next. pub static SCHEDULED_JOBS_INDEX: LazyLock<SystemIndex<ScheduledJobsTable>> = LazyLock::new(|| SystemIndex::new("by_next_ts", [&NEXT_TS_FIELD]).unwrap()); /// By udf path and next ts. Used by the dashboard to group scheduled jobs by /// udf function. pub static SCHEDULED_JOBS_INDEX_BY_UDF_PATH: LazyLock<SystemIndex<ScheduledJobsTable>> = LazyLock::new(|| { SystemIndex::new( "by_udf_path_and_next_event_ts", [&UDF_PATH_FIELD, &NEXT_TS_FIELD], ) .unwrap() }); /// By completed ts. Used to efficiently find jobs to garbage collect. pub static SCHEDULED_JOBS_INDEX_BY_COMPLETED_TS: LazyLock<SystemIndex<ScheduledJobsTable>> = LazyLock::new(|| SystemIndex::new("by_completed_ts", [&COMPLETED_TS_FIELD]).unwrap()); pub static NEXT_TS_FIELD: LazyLock<FieldPath> = LazyLock::new(|| "nextTs".parse().expect("invalid nextTs field")); pub static COMPLETED_TS_FIELD: LazyLock<FieldPath> = LazyLock::new(|| "completedTs".parse().expect("invalid completedTs field")); static UDF_PATH_FIELD: LazyLock<FieldPath> = LazyLock::new(|| "udfPath".parse().expect("invalid udfPath field")); static COMPONENT_PATH_FIELD: LazyLock<FieldPath> = LazyLock::new(|| "component".parse().expect("invalid component field")); pub struct ScheduledJobsTable; impl SystemTable for ScheduledJobsTable { type Metadata = ScheduledJob; fn table_name() -> &'static TableName { &SCHEDULED_JOBS_TABLE } fn indexes() -> Vec<SystemIndex<Self>> { vec![ SCHEDULED_JOBS_INDEX_BY_COMPLETED_TS.clone(), SCHEDULED_JOBS_INDEX.clone(), SCHEDULED_JOBS_INDEX_BY_UDF_PATH.clone(), ] } fn virtual_table() -> Option<( &'static TableName, BTreeMap<IndexName, IndexName>, Arc<dyn VirtualSystemDocMapper>, )> { Some(( &SCHEDULED_JOBS_VIRTUAL_TABLE, btreemap! { SCHEDULED_JOBS_VIRTUAL_INDEX_BY_CREATION_TIME.clone() => SCHEDULED_JOBS_INDEX_BY_CREATION_TIME.clone(), SCHEDULED_JOBS_VIRTUAL_INDEX_BY_ID.clone() => SCHEDULED_JOBS_INDEX_BY_ID.clone(), }, Arc::new(ScheduledJobsDocMapper), )) } } // Maintains state for scheduling asynchronous functions (scheduled jobs). pub struct SchedulerModel<'a, RT: Runtime> { tx: &'a mut Transaction<RT>, namespace: TableNamespace, } impl<'a, RT: Runtime> SchedulerModel<'a, RT> { pub fn new(tx: &'a mut Transaction<RT>, namespace: TableNamespace) -> Self { Self { tx, namespace } } fn check_scheduling_limits(&mut self, args: &ConvexArray) -> anyhow::Result<()> { // Limit how much you can schedule from a single transaction. anyhow::ensure!( self.tx.scheduled_size.num_writes < *TRANSACTION_MAX_NUM_SCHEDULED, ErrorMetadata::bad_request( "TooManyFunctionsScheduled", format!( "Too many functions scheduled by this mutation (limit: {})", *TRANSACTION_MAX_NUM_SCHEDULED, ) ) ); self.tx.scheduled_size.num_writes += 1; anyhow::ensure!( self.tx.scheduled_size.size + args.size() <= *TRANSACTION_MAX_SCHEDULED_TOTAL_ARGUMENT_SIZE_BYTES, ErrorMetadata::bad_request( "ScheduledFunctionsArgumentsTooLarge", format!( "Too large total size of the arguments of scheduled functions from this \ mutation (limit: {} bytes)", *TRANSACTION_MAX_SCHEDULED_TOTAL_ARGUMENT_SIZE_BYTES, ) ), ); self.tx.scheduled_size.size += args.size(); Ok(()) } pub async fn schedule( &mut self, path: CanonicalizedComponentFunctionPath, args: ConvexArray, ts: UnixTimestamp, context: ExecutionContext, ) -> anyhow::Result<ResolvedDocumentId> { if path.udf_path.is_system() && !(self.tx.identity().is_admin() || self.tx.identity().is_system()) { anyhow::bail!(unauthorized_error("schedule")) } self.check_scheduling_limits(&args)?; let now: Timestamp = self.tx.runtime().generate_timestamp()?; let original_scheduled_ts: Timestamp = ts.as_system_time().try_into()?; let scheduled_job = ScheduledJob::new( path.clone(), args.clone(), ScheduledJobState::Pending, // Don't set next_ts in the past to avoid scheduler incorrectly logging // it is falling behind. We should keep `original_scheduled_ts` intact // since this is exposed to the developer via the virtual table. Some(original_scheduled_ts.max(now)), None, original_scheduled_ts, ScheduledJobAttempts::default(), )?; let job = if let Some((parent_component_id, parent_scheduled_job)) = context.parent_scheduled_job { let table_mapping = self.tx.table_mapping().clone(); let current_namespace = self.namespace; let parent_namespace = TableNamespace::from(parent_component_id); let mut get_parent_scheduled_job_state = async |namespace: TableNamespace| { let Ok(parent_scheduled_job) = parent_scheduled_job .to_resolved(table_mapping.namespace(namespace).number_to_tablet()) else { return anyhow::Ok(None); }; self.check_status(parent_scheduled_job).await }; // Try both `parent_namespace` and `self.namespace` because there may be // version skew where `parent_namespace` is incorrectly the Root namespace. // TODO: once version skew is not an issue, only check `parent_namespace`. let parent_scheduled_job_state = match get_parent_scheduled_job_state(parent_namespace).await? { Some(state) => Some(state), None => get_parent_scheduled_job_state(current_namespace).await?, }; if let Some(parent_scheduled_job_state) = parent_scheduled_job_state { match parent_scheduled_job_state { ScheduledJobState::Pending | ScheduledJobState::InProgress { .. } | ScheduledJobState::Failed(_) | ScheduledJobState::Success => scheduled_job, ScheduledJobState::Canceled => { let scheduled_ts = self.tx.begin_timestamp(); ScheduledJob::new( path, args, ScheduledJobState::Canceled, None, Some(*scheduled_ts), *scheduled_ts, ScheduledJobAttempts::default(), )? }, } } else { scheduled_job } } else { scheduled_job }; let id = SystemMetadataModel::new(self.tx, self.namespace) .insert_metadata(&SCHEDULED_JOBS_TABLE, job.try_into()?) .await?; Ok(id) } pub async fn replace( &mut self, id: ResolvedDocumentId, job: ScheduledJob, ) -> anyhow::Result<()> { anyhow::ensure!(self .tx .table_mapping() .namespace(self.namespace) .tablet_matches_name(id.tablet_id, &SCHEDULED_JOBS_TABLE)); SystemMetadataModel::new(self.tx, self.namespace) .replace(id, job.try_into()?) .await?; Ok(()) } pub async fn complete( &mut self, id: ResolvedDocumentId, state: ScheduledJobState, ) -> anyhow::Result<()> { match state { ScheduledJobState::InProgress { .. } | ScheduledJobState::Pending => { anyhow::bail!("invalid state for completing a scheduled job") }, ScheduledJobState::Canceled | ScheduledJobState::Failed(_) | ScheduledJobState::Success => {}, } let Some(job) = self.tx.get(id).await? else { anyhow::bail!("scheduled job not found") }; let job: ParsedDocument<ScheduledJob> = job.parse()?; match job.state { ScheduledJobState::Pending | ScheduledJobState::InProgress { .. } => {}, ScheduledJobState::Canceled => { // If the job is already canceled. Completing is a no-op. We // should proceed without throwing an error. return Ok(()); }, ScheduledJobState::Failed(_) | ScheduledJobState::Success => { anyhow::bail!( "Scheduled job cannot be completed because it is in state {:?}", job.state ) }, } let mut job: ScheduledJob = job.into_value(); job.state = state; // Remove next_ts and set completed_ts so the scheduler knows that the // job has already been processed job.next_ts = None; job.completed_ts = Some(*self.tx.begin_timestamp()); SystemMetadataModel::new(self.tx, self.namespace) .replace(id, job.try_into()?) .await?; Ok(()) } /// Cancel a scheduled job if it is in Pending or InProgress state. /// Otherwise, it has already been completed in another transaction. pub async fn cancel(&mut self, id: ResolvedDocumentId) -> anyhow::Result<()> { if let Some(scheduled_job) = self.check_status(id).await? { match scheduled_job { ScheduledJobState::Pending | ScheduledJobState::InProgress { .. } => { self.complete(id, ScheduledJobState::Canceled).await?; }, ScheduledJobState::Canceled | ScheduledJobState::Success | ScheduledJobState::Failed(_) => {}, } } else { tracing::error!("Tried to cancel a job with unknown state: {}", id) } Ok(()) } pub async fn delete(&mut self, id: ResolvedDocumentId) -> anyhow::Result<()> { anyhow::ensure!(self .tx .table_mapping() .namespace(self.namespace) .tablet_matches_name(id.tablet_id, &SCHEDULED_JOBS_TABLE)); self.tx.delete_inner(id).await?; Ok(()) } // Cancel up to `limit` jobs for the UDF and return how many were canceled. // Note: the caller will assume all have been canceled if Result < `limit`. pub async fn cancel_all( &mut self, path: Option<CanonicalizedComponentFunctionPath>, limit: usize, start_next_ts: Option<Timestamp>, end_next_ts: Option<Timestamp>, ) -> anyhow::Result<usize> { let index_query = match path { Some(path) => { let udf_path = path.udf_path; let component_path = path.component; let mut component_path_filter = Expression::Eq( Expression::Field(COMPONENT_PATH_FIELD.clone()).into(), Expression::Literal(maybe_val!(String::from(component_path.clone()))).into(), ); if component_path.is_root() { component_path_filter = Expression::Or(vec![ component_path_filter, Expression::Eq( Expression::Field(COMPONENT_PATH_FIELD.clone()).into(), Expression::Literal(maybe_val!(undefined)).into(), ), ]); } let range = vec![ IndexRangeExpression::Eq( UDF_PATH_FIELD.clone(), ConvexValue::try_from(udf_path.to_string())?.into(), ), IndexRangeExpression::Gte( NEXT_TS_FIELD.clone(), i64::from(start_next_ts.unwrap_or(Timestamp::MIN)).into(), ), IndexRangeExpression::Lt( NEXT_TS_FIELD.clone(), i64::from(end_next_ts.unwrap_or(Timestamp::MAX)).into(), ), ]; Query::index_range(IndexRange { index_name: SCHEDULED_JOBS_INDEX_BY_UDF_PATH.name(), range, order: Order::Asc, }) .filter(component_path_filter) }, None => { let range = vec![ IndexRangeExpression::Gte( NEXT_TS_FIELD.clone(), i64::from(start_next_ts.unwrap_or(Timestamp::MIN)).into(), ), IndexRangeExpression::Lt( NEXT_TS_FIELD.clone(), i64::from(end_next_ts.unwrap_or(Timestamp::MAX)).into(), ), ]; Query::index_range(IndexRange { index_name: SCHEDULED_JOBS_INDEX.name(), range, order: Order::Asc, }) }, }; let mut query_stream = ResolvedQuery::new(self.tx, self.namespace, index_query)?; let mut count = 0; while count < limit && let Some(doc) = query_stream.next(self.tx, None).await? { self.cancel(doc.id()).await?; count += 1; } Ok(count) } pub async fn list(&mut self) -> anyhow::Result<Vec<ParsedDocument<ScheduledJob>>> { let scheduled_query = Query::full_table_scan(SCHEDULED_JOBS_TABLE.clone(), Order::Asc); let mut query_stream = ResolvedQuery::new(self.tx, self.namespace, scheduled_query)?; let mut scheduled_jobs = Vec::new(); while let Some(job) = query_stream.next(self.tx, None).await? { let job: ParsedDocument<ScheduledJob> = job.parse()?; scheduled_jobs.push(job); } Ok(scheduled_jobs) } /// Checks the status of the scheduled job. If it has been garbage collected /// and the scheduled job is no longer in the table, it returns None. pub async fn check_status( &mut self, job_id: ResolvedDocumentId, ) -> anyhow::Result<Option<ScheduledJobState>> { let state = self .tx .get(job_id) .await? .map(ParseDocument::<ScheduledJob>::parse) .transpose()? .map(|job| job.state.clone()); Ok(state) } } /// Same as SchedulerModel but works with the respective virtual table instead /// of the underlying system table. pub struct VirtualSchedulerModel<'a, RT: Runtime> { tx: &'a mut Transaction<RT>, namespace: TableNamespace, } impl<'a, RT: Runtime> VirtualSchedulerModel<'a, RT> { pub fn new(tx: &'a mut Transaction<RT>, namespace: TableNamespace) -> Self { Self { tx, namespace } } pub async fn schedule( &mut self, path: CanonicalizedComponentFunctionPath, args: ConvexArray, ts: UnixTimestamp, context: ExecutionContext, ) -> anyhow::Result<DeveloperDocumentId> { let system_id = SchedulerModel::new(self.tx, self.namespace) .schedule(path, args, ts, context) .await?; self.tx .virtual_system_mapping() .system_resolved_id_to_virtual_developer_id(system_id) } pub async fn cancel(&mut self, virtual_id: DeveloperDocumentId) -> anyhow::Result<()> { let table_mapping = self.tx.table_mapping().clone(); let system_id = self .tx .virtual_system_mapping() .virtual_id_v6_to_system_resolved_doc_id(self.namespace, &virtual_id, &table_mapping)?; SchedulerModel::new(self.tx, self.namespace) .cancel(system_id) .await } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server