Skip to main content
Glama

Convex MCP server

Official
by get-convex
async_syscall.rs58.4 kB
#![allow(non_snake_case)] use std::{ collections::BTreeMap, marker::PhantomData, time::Duration, }; use anyhow::Context; use common::{ bootstrap_model::components::handles::FunctionHandle, components::{ CanonicalizedComponentFunctionPath, ComponentId, PublicFunctionPath, Reference, ResolvedComponentFunctionPath, Resource, }, document::DeveloperDocument, execution_context::ExecutionContext, knobs::{ MAX_REACTOR_CALL_DEPTH, MAX_SYSCALL_BATCH_SIZE, TRANSACTION_MAX_READ_SIZE_ROWS, }, query::{ Cursor, CursorPosition, Query, }, query_journal::QueryJournal, runtime::{ Runtime, UnixTimestamp, }, types::{ AllowedVisibility, PersistenceVersion, UdfType, }, value::ConvexValue, version::Version, }; use database::{ query::{ query_batch_next, PaginationOptions, TableFilter, }, soft_data_limit, table_summary::table_summary_bootstrapping_error, BootstrapComponentsModel, DeveloperQuery, PatchValue, Transaction, UserFacingModel, }; use deno_core::v8; use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; use itertools::Itertools; use keybroker::KeyBroker; use model::{ components::{ handles::FunctionHandlesModel, ComponentsModel, }, file_storage::{ types::FileStorageEntry, BatchKey, FileStorageId, }, scheduled_jobs::VirtualSchedulerModel, virtual_system_mapping, }; use serde::{ Deserialize, Serialize, }; use serde_json::{ json, Value as JsonValue, }; use udf::{ validation::{ validate_schedule_args, ValidatedPathAndArgs, }, FunctionOutcome, UdfOutcome, }; use value::{ heap_size::HeapSize, id_v6::DeveloperDocumentId, ConvexArray, ConvexObject, TableName, }; use super::DatabaseUdfEnvironment; use crate::{ client::EnvironmentData, environment::{ action::parse_name_or_reference, helpers::{ check_table_name, parse_version, remove_rejected_before_execution, with_argument_error, ArgName, }, }, helpers::UdfArgsJson, isolate2::client::QueryId, metrics::{ async_syscall_timer, log_component_get_user_identity, log_run_udf, }, }; pub struct PendingSyscall { pub name: String, pub args: JsonValue, pub resolver: v8::Global<v8::PromiseResolver>, } impl HeapSize for PendingSyscall { fn heap_size(&self) -> usize { self.name.heap_size() + self.args.heap_size() } } // Checks if the underlying table and the request's expectation for the table // line up. pub fn system_table_guard(name: &TableName, expect_system_table: bool) -> anyhow::Result<()> { if expect_system_table && !name.is_system() { return Err(anyhow::anyhow!(ErrorMetadata::bad_request( "SystemTableError", "User tables cannot be accessed with db.system." ))); } else if !expect_system_table && name.is_system() { return Err(anyhow::anyhow!(ErrorMetadata::bad_request( "SystemTableError", "System tables can only be accessed with db.system." ))); } Ok(()) } /// A batch of async syscalls that can run "in parallel", where they actually /// execute in a batch for determinism, but as far as the js promises are /// concerned, they're running in parallel. /// This could conceivably run reads /// (get/queryStreamNext/queryPage/storageGetUrl) all in a single batch. /// We could also run inserts, deletes, patches, and replaces in a batch /// together, disjoint from reads, as long as the affected IDs are disjoint. /// For now, we only allow batches of `db.get`s and `db.query`s. /// TODO(lee) implement other kinds of batches. #[derive(Debug)] pub enum AsyncSyscallBatch { Reads(Vec<AsyncRead>), StorageGetUrls(Vec<JsonValue>), Unbatched { name: String, args: JsonValue }, } #[derive(Debug)] pub enum AsyncRead { Get(JsonValue), QueryStreamNext(JsonValue), } impl AsyncSyscallBatch { pub fn new(name: String, args: JsonValue) -> Self { match &*name { "1.0/get" => Self::Reads(vec![AsyncRead::Get(args)]), "1.0/queryStreamNext" => Self::Reads(vec![AsyncRead::QueryStreamNext(args)]), "1.0/storageGetUrl" => Self::StorageGetUrls(vec![args]), _ => Self::Unbatched { name, args }, } } pub fn can_push(&self, name: &str, _args: &JsonValue) -> bool { if self.len() >= *MAX_SYSCALL_BATCH_SIZE { return false; } match (self, name) { (Self::Reads(_), "1.0/get") => true, (Self::Reads(_), "1.0/queryStreamNext") => true, (Self::Reads(_), _) => false, (Self::StorageGetUrls(_), "1.0/storageGetUrl") => true, (Self::StorageGetUrls(_), _) => false, (Self::Unbatched { .. }, _) => false, } } pub fn push(&mut self, name: String, args: JsonValue) -> anyhow::Result<()> { match (&mut *self, &*name) { (Self::Reads(batch_args), "1.0/get") => batch_args.push(AsyncRead::Get(args)), (Self::Reads(batch_args), "1.0/queryStreamNext") => { batch_args.push(AsyncRead::QueryStreamNext(args)) }, (Self::StorageGetUrls(batch_args), "1.0/storageGetUrl") => { batch_args.push(args); }, _ => anyhow::bail!("cannot push {name} onto {self:?}"), } Ok(()) } pub fn name(&self) -> &str { match self { // 1.0/get is grouped in with 1.0/queryStreamNext. Self::Reads(_) => "1.0/queryStreamNext", Self::StorageGetUrls(_) => "1.0/storageGetUrl", Self::Unbatched { name, .. } => name, } } pub fn len(&self) -> usize { match self { Self::Reads(args) => args.len(), Self::StorageGetUrls(args) => args.len(), Self::Unbatched { .. } => 1, } } pub fn is_empty(&self) -> bool { self.len() == 0 } } pub struct QueryManager<RT: Runtime> { next_id: u32, developer_queries: BTreeMap<u32, DeveloperQuery<RT>>, } impl<RT: Runtime> QueryManager<RT> { pub fn new() -> Self { Self { next_id: 0, developer_queries: BTreeMap::new(), } } pub fn put_developer(&mut self, query: DeveloperQuery<RT>) -> u32 { let id = self.next_id; self.developer_queries.insert(id, query); self.next_id += 1; id } pub fn take_developer(&mut self, id: u32) -> Option<DeveloperQuery<RT>> { self.developer_queries.remove(&id) } pub fn insert_developer(&mut self, id: u32, query: DeveloperQuery<RT>) { self.developer_queries.insert(id, query); } pub fn cleanup_developer(&mut self, id: u32) -> bool { self.developer_queries.remove(&id).is_some() } } pub enum ManagedQuery<RT: Runtime> { Pending { query: Query, version: Option<Version>, }, Active(DeveloperQuery<RT>), } // Trait for allowing code reuse between `DatabaseUdfEnvironment` and isolate2. #[allow(async_fn_in_trait)] pub trait AsyncSyscallProvider<RT: Runtime> { fn rt(&self) -> &RT; fn tx(&mut self) -> anyhow::Result<&mut Transaction<RT>>; fn key_broker(&self) -> &KeyBroker; fn context(&self) -> &ExecutionContext; fn observe_identity(&mut self) -> anyhow::Result<()>; fn persistence_version(&self) -> PersistenceVersion; fn is_system(&self) -> bool; fn table_filter(&self) -> TableFilter; fn component(&self) -> anyhow::Result<ComponentId>; fn log_async_syscall(&mut self, name: String, duration: Duration, is_success: bool); fn take_query(&mut self, query_id: QueryId) -> Option<ManagedQuery<RT>>; fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery<RT>); fn cleanup_query(&mut self, query_id: QueryId) -> bool; fn prev_journal(&mut self) -> &mut QueryJournal; fn next_journal(&mut self) -> &mut QueryJournal; async fn validate_schedule_args( &mut self, path: CanonicalizedComponentFunctionPath, args: Vec<JsonValue>, scheduled_ts: UnixTimestamp, ) -> anyhow::Result<(CanonicalizedComponentFunctionPath, ConvexArray)>; async fn file_storage_generate_upload_url(&mut self) -> anyhow::Result<String>; async fn file_storage_get_url_batch( &mut self, storage_ids: BTreeMap<BatchKey, FileStorageId>, ) -> BTreeMap<BatchKey, anyhow::Result<Option<String>>>; async fn file_storage_delete(&mut self, storage_id: FileStorageId) -> anyhow::Result<()>; async fn file_storage_get_entry( &mut self, storage_id: FileStorageId, ) -> anyhow::Result<Option<FileStorageEntry>>; async fn run_udf( &mut self, udf_type: UdfType, path: ResolvedComponentFunctionPath, args: ConvexObject, ) -> anyhow::Result<ConvexValue>; async fn create_function_handle( &mut self, path: CanonicalizedComponentFunctionPath, ) -> anyhow::Result<FunctionHandle>; async fn resolve(&mut self, reference: Reference) -> anyhow::Result<Resource>; async fn lookup_function_handle( &mut self, handle: FunctionHandle, ) -> anyhow::Result<CanonicalizedComponentFunctionPath>; } impl<RT: Runtime> AsyncSyscallProvider<RT> for DatabaseUdfEnvironment<RT> { fn rt(&self) -> &RT { &self.phase.rt } fn tx(&mut self) -> anyhow::Result<&mut Transaction<RT>> { self.phase.tx() } fn component(&self) -> anyhow::Result<ComponentId> { self.phase.component() } fn key_broker(&self) -> &KeyBroker { &self.key_broker } fn context(&self) -> &ExecutionContext { &self.context } fn observe_identity(&mut self) -> anyhow::Result<()> { self.phase.observe_identity() } fn persistence_version(&self) -> PersistenceVersion { self.persistence_version } fn is_system(&self) -> bool { self.path.udf_path.is_system() } fn table_filter(&self) -> TableFilter { if self.path.udf_path.is_system() { TableFilter::IncludePrivateSystemTables } else { TableFilter::ExcludePrivateSystemTables } } fn log_async_syscall(&mut self, name: String, duration: Duration, is_success: bool) { self.syscall_trace .log_async_syscall(name, duration, is_success); } fn take_query(&mut self, query_id: QueryId) -> Option<ManagedQuery<RT>> { self.query_manager .take_developer(query_id) .map(ManagedQuery::Active) } fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery<RT>) { self.query_manager.insert_developer(query_id, query); } fn cleanup_query(&mut self, query_id: QueryId) -> bool { self.query_manager.cleanup_developer(query_id) } fn prev_journal(&mut self) -> &mut QueryJournal { &mut self.prev_journal } fn next_journal(&mut self) -> &mut QueryJournal { &mut self.next_journal } async fn validate_schedule_args( &mut self, path: CanonicalizedComponentFunctionPath, args: Vec<JsonValue>, scheduled_ts: UnixTimestamp, ) -> anyhow::Result<(CanonicalizedComponentFunctionPath, ConvexArray)> { validate_schedule_args( path, args, scheduled_ts, self.phase.unix_timestamp()?, self.phase.tx()?, ) .await } async fn file_storage_generate_upload_url(&mut self) -> anyhow::Result<String> { let issued_ts = self.phase.unix_timestamp()?; let component = self.component()?; let post_url = self .file_storage .generate_upload_url(self.phase.tx()?, &self.key_broker, issued_ts, component) .await?; Ok(post_url) } async fn file_storage_get_url_batch( &mut self, storage_ids: BTreeMap<BatchKey, FileStorageId>, ) -> BTreeMap<BatchKey, anyhow::Result<Option<String>>> { let component = match self.component() { Ok(c) => c, Err(e) => { return storage_ids .into_keys() .map(|batch_key| (batch_key, Err(e.clone_error()))) .collect(); }, }; let tx = match self.phase.tx() { Ok(tx) => tx, Err(e) => { return storage_ids .into_keys() .map(|batch_key| (batch_key, Err(e.clone_error()))) .collect(); }, }; self.file_storage .get_url_batch(tx, component, storage_ids) .await } async fn file_storage_delete(&mut self, storage_id: FileStorageId) -> anyhow::Result<()> { let component = self.component()?; self.file_storage .delete(self.phase.tx()?, component.into(), storage_id) .await } async fn file_storage_get_entry( &mut self, storage_id: FileStorageId, ) -> anyhow::Result<Option<FileStorageEntry>> { let component = self.component()?; self.file_storage .get_file_entry(self.phase.tx()?, component.into(), storage_id) .await } #[fastrace::trace] async fn run_udf( &mut self, udf_type: UdfType, path: ResolvedComponentFunctionPath, args: ConvexObject, ) -> anyhow::Result<ConvexValue> { match (self.udf_type, udf_type) { // Queries can call other queries. (UdfType::Query, UdfType::Query) => (), // Mutations can call queries or mutations, (UdfType::Mutation, UdfType::Query | UdfType::Mutation) => (), _ => { anyhow::bail!(ErrorMetadata::bad_request( "InvalidFunctionCall", format!( "Cannot call a {} function from a {} function", udf_type, self.udf_type ) )); }, } let tx = self.phase.tx()?; let called_component_id = path.component; let path_and_args_result = ValidatedPathAndArgs::new_with_returns_validator( AllowedVisibility::All, tx, PublicFunctionPath::ResolvedComponent(path.clone()), ConvexArray::try_from(vec![args.into()])?, udf_type, ) .await?; let (path_and_args, returns_validator) = match path_and_args_result { Ok(r) => r, Err(e) => { // TODO: Propagate this JsError to user space correctly. anyhow::bail!(ErrorMetadata::bad_request("InvalidArgs", e.message)); }, }; // NB: Since this is a user error, we need to do this check before we take the // transaction below. let new_reactor_depth = if matches!(udf_type, UdfType::Query | UdfType::Mutation) { if self.reactor_depth >= *MAX_REACTOR_CALL_DEPTH { anyhow::bail!(ErrorMetadata::bad_request( "MaximumCallDepthExceeded", "Cross component call depth limit exceeded. Do you have an infinite loop in \ your app?" )); } self.reactor_depth + 1 } else { 0 }; let mut tx = self.phase.take_tx()?; let tokens = tx.begin_subtransaction(); let query_journal = if self.is_system() && udf_type == UdfType::Query { self.prev_journal.clone() } else { QueryJournal::new() }; let (mut tx, outcome) = self .udf_callback .execute_udf( self.client_id.clone(), udf_type, path_and_args, EnvironmentData { key_broker: self.key_broker.clone(), default_system_env_vars: BTreeMap::new(), file_storage: self.file_storage.clone(), module_loader: self.phase.module_loader().clone(), }, tx, query_journal, self.context.clone(), new_reactor_depth, ) .await .map_err(remove_rejected_before_execution)?; match (udf_type, &outcome) { (UdfType::Mutation, FunctionOutcome::Mutation(UdfOutcome { result: Err(_), .. })) => { tx.rollback_subtransaction(tokens)? }, _ => tx.commit_subtransaction(tokens)?, } self.phase.put_tx(tx)?; let outcome = match (udf_type, outcome) { (UdfType::Query, FunctionOutcome::Query(outcome)) | (UdfType::Mutation, FunctionOutcome::Mutation(outcome)) => outcome, _ => anyhow::bail!("Unexpected outcome for {udf_type:?}"), }; let UdfOutcome { result, observed_identity, // TODO: initialize the inner UDF's seed from the outer RNG seed observed_rng: _, // TODO: use the same timestamp for the inner UDF as the outer observed_time: _, // TODO: consider propagating syscall traces syscall_trace: _, log_lines, journal, arguments: _, identity: _, path: _, udf_server_version: _, unix_timestamp: _, rng_seed: _, } = outcome; log_run_udf( self.udf_type, udf_type, self.phase.observed_identity(), observed_identity, ); if observed_identity { self.observe_identity()?; } if self.is_system() && udf_type == UdfType::Query && result.is_ok() { self.next_journal = journal; } // TODO(ENG-7663): restrict log lines within subfunctions instead of // limiting them only when they are returned to the parent. self.emit_sub_function_log_lines(path.for_logging(), log_lines); let result = match result { Ok(r) => r.unpack(), Err(e) => { // TODO: How do we want to propagate stack traces between component calls? anyhow::bail!(e); }, }; let tx = self.phase.tx()?; let table_mapping = tx.table_mapping().namespace(called_component_id.into()); if let Some(e) = returns_validator.check_output(&result, &table_mapping, virtual_system_mapping()) { anyhow::bail!(ErrorMetadata::bad_request("InvalidReturnValue", e.message)); } Ok(result) } async fn create_function_handle( &mut self, path: CanonicalizedComponentFunctionPath, ) -> anyhow::Result<FunctionHandle> { let tx = self.phase.tx()?; FunctionHandlesModel::new(tx) .get_with_component_path(path) .await } async fn resolve(&mut self, reference: Reference) -> anyhow::Result<Resource> { let current_component_id = self.component()?; let current_udf_path = self.path.udf_path.clone().into(); let tx = self.phase.tx()?; ComponentsModel::new(tx) .resolve(current_component_id, Some(current_udf_path), &reference) .await } async fn lookup_function_handle( &mut self, handle: FunctionHandle, ) -> anyhow::Result<CanonicalizedComponentFunctionPath> { FunctionHandlesModel::new(self.phase.tx()?) .lookup(handle) .await } } /// These are syscalls that exist on `db` in `convex/server` for npm versions >= /// 0.16.0. They expect DocumentIdv6 strings (as opposed to ID classes). /// /// Most of the common logic lives on `Transaction` or `DatabaseSyscallsShared`, /// and this is mostly just taking care of the argument parsing. pub struct DatabaseSyscallsV1<RT: Runtime, P: AsyncSyscallProvider<RT>> { _pd: PhantomData<(RT, P)>, } impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> { /// Runs a batch of syscalls, each of which can succeed or fail /// independently. The returned vec is the same length as the batch. #[fastrace::trace] pub async fn run_async_syscall_batch( provider: &mut P, batch: AsyncSyscallBatch, ) -> Vec<anyhow::Result<String>> { let start = provider.rt().monotonic_now(); let batch_name = batch.name().to_string(); let timer = async_syscall_timer(&batch_name); // Outer error is a system error that encompases the whole batch, while // inner errors are for individual batch items that may be system or developer // errors. let results = match batch { AsyncSyscallBatch::Reads(batch_args) => Self::query_batch(provider, batch_args).await, AsyncSyscallBatch::StorageGetUrls(batch_args) => { Self::storage_get_url_batch(provider, batch_args).await }, AsyncSyscallBatch::Unbatched { name, args } => { let result = match &name[..] { // Database "1.0/count" => Box::pin(Self::count(provider, args)).await, "1.0/insert" => Box::pin(Self::insert(provider, args)).await, "1.0/shallowMerge" => Box::pin(Self::shallow_merge(provider, args)).await, "1.0/replace" => Box::pin(Self::replace(provider, args)).await, "1.0/remove" => Box::pin(Self::remove(provider, args)).await, "1.0/queryPage" => Box::pin(Self::query_page(provider, args)).await, // Auth "1.0/getUserIdentity" => { Box::pin(Self::get_user_identity(provider, args)).await }, // Storage "1.0/storageDelete" => Box::pin(Self::storage_delete(provider, args)).await, "1.0/storageGetMetadata" => { Box::pin(Self::storage_get_metadata(provider, args)).await }, "1.0/storageGenerateUploadUrl" => { Box::pin(Self::storage_generate_upload_url(provider, args)).await }, // Scheduling "1.0/schedule" => Box::pin(Self::schedule(provider, args)).await, "1.0/cancel_job" => Box::pin(Self::cancel_job(provider, args)).await, // Components "1.0/runUdf" => Box::pin(Self::run_udf(provider, args)).await, "1.0/createFunctionHandle" => { Box::pin(Self::create_function_handle(provider, args)).await }, #[cfg(test)] "slowSyscall" => { std::thread::sleep(std::time::Duration::from_secs(1)); Ok(JsonValue::Number(1017.into())) }, #[cfg(test)] "reallySlowSyscall" => { std::thread::sleep(std::time::Duration::from_secs(3)); Ok(JsonValue::Number(1017.into())) }, _ => Err(ErrorMetadata::bad_request( "UnknownAsyncOperation", format!("Unknown async operation {name}"), ) .into()), }; vec![result] }, }; provider.log_async_syscall( batch_name, start.elapsed(), results.iter().all(|result| result.is_ok()), ); timer.finish(); results .into_iter() .map(|result| anyhow::Ok(serde_json::to_string(&result?)?)) .collect() } #[convex_macro::instrument_future] async fn count(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CountArgs { table: String, } let table = with_argument_error("db.count", || { let args: CountArgs = serde_json::from_value(args)?; args.table.parse().context(ArgName("table")) })?; let component = provider.component()?; let tx = provider.tx()?; let result = tx.count(component.into(), &table).await?; let Some(result) = result else { return Err(table_summary_bootstrapping_error(Some( "Table count unavailable while bootstrapping", ))); }; // Trim to u32 and check for overflow. let result = u32::try_from(result)?; // Return as f64, which converts to number type in Javascript. let result = f64::from(result); Ok(ConvexValue::from(result).to_internal_json()) } #[convex_macro::instrument_future] async fn get_user_identity(provider: &mut P, _args: JsonValue) -> anyhow::Result<JsonValue> { provider.observe_identity()?; // TODO: Somehow make the Transaction aware of the dependency on the user. let tx = provider.tx()?; let user_identity = tx.user_identity(); if !provider.component()?.is_root() { log_component_get_user_identity(user_identity.is_some()); } if let Some(user_identity) = user_identity { return user_identity.try_into(); } Ok(JsonValue::Null) } #[convex_macro::instrument_future] async fn storage_generate_upload_url( provider: &mut P, _args: JsonValue, ) -> anyhow::Result<JsonValue> { let post_url = provider.file_storage_generate_upload_url().await?; Ok(serde_json::to_value(post_url)?) } #[convex_macro::instrument_future] async fn storage_get_url_batch( provider: &mut P, batch_args: Vec<JsonValue>, ) -> Vec<anyhow::Result<JsonValue>> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct GetUrlArgs { storage_id: String, } let batch_size = batch_args.len(); let mut results = BTreeMap::new(); let mut storage_ids = BTreeMap::new(); for (idx, args) in batch_args.into_iter().enumerate() { let storage_id_result = with_argument_error("storage.getUrl", || { let GetUrlArgs { storage_id } = serde_json::from_value(args)?; storage_id.parse().context(ArgName("storageId")) }); match storage_id_result { Ok(storage_id) => { storage_ids.insert(idx, storage_id); }, Err(e) => { assert!(results.insert(idx, Err(e)).is_none()); }, } } let urls = provider.file_storage_get_url_batch(storage_ids).await; for (batch_key, url) in urls { assert!(results .insert(batch_key, url.map(JsonValue::from)) .is_none()); } assert_eq!(results.len(), batch_size); results.into_values().collect() } #[convex_macro::instrument_future] async fn storage_delete(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct StorageDeleteArgs { storage_id: String, } let storage_id: FileStorageId = with_argument_error("storage.delete", || { let StorageDeleteArgs { storage_id } = serde_json::from_value(args)?; storage_id.parse().context(ArgName("storageId")) })?; // Synchronously delete the file from storage provider.file_storage_delete(storage_id).await?; Ok(JsonValue::Null) } #[convex_macro::instrument_future] async fn storage_get_metadata(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct StorageGetMetadataArgs { storage_id: String, } let storage_id: FileStorageId = with_argument_error("storage.getMetadata", || { let StorageGetMetadataArgs { storage_id } = serde_json::from_value(args)?; storage_id.parse().context(ArgName("storageId")) })?; #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct FileMetadataJson { storage_id: String, sha256: String, size: i64, content_type: Option<String>, } let file_metadata = provider.file_storage_get_entry(storage_id).await?.map( |FileStorageEntry { storage_id, storage_key: _, // internal field that we shouldn't return in syscalls sha256, size, content_type, }| { FileMetadataJson { storage_id: storage_id.to_string(), // TODO(CX-5533) use base64 for consistency. sha256: sha256.as_hex(), size, content_type, } }, ); Ok(serde_json::to_value(file_metadata)?) } #[convex_macro::instrument_future] async fn schedule(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct ScheduleArgs { name: Option<String>, reference: Option<String>, function_handle: Option<String>, ts: f64, args: UdfArgsJson, } let ScheduleArgs { name, reference, function_handle, ts, args, }: ScheduleArgs = with_argument_error("scheduler", || Ok(serde_json::from_value(args)?))?; let path = match function_handle { Some(h) => { let handle: FunctionHandle = with_argument_error("scheduler", || h.parse())?; provider.lookup_function_handle(handle).await? }, None => { let reference = parse_name_or_reference("scheduler", name, reference)?; match provider.resolve(reference).await? { Resource::Value(v) => { anyhow::bail!(ErrorMetadata::bad_request( "InvalidResource", format!( "Only functions can be scheduled. {} is not a function", v.to_internal_json() ), )); }, Resource::Function(p) => p, Resource::ResolvedSystemUdf { .. } => { anyhow::bail!("Cannot schedule function by component id"); }, } }, }; let scheduling_component = provider.component()?; let scheduled_ts = with_argument_error("ts", || UnixTimestamp::from_secs_f64(ts))?; let (path, udf_args) = provider .validate_schedule_args( path, args.into_serialized_args()?.into_args()?, scheduled_ts, ) .await?; let context = provider.context().clone(); let tx = provider.tx()?; let virtual_id = VirtualSchedulerModel::new(tx, scheduling_component.into()) .schedule(path, udf_args, scheduled_ts, context) .await?; Ok(JsonValue::from(virtual_id)) } #[convex_macro::instrument_future] async fn cancel_job(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CancelJobArgs { id: String, } let component = provider.component()?; let tx = provider.tx()?; let virtual_id_v6 = with_argument_error("db.cancel_job", || { let args: CancelJobArgs = serde_json::from_value(args)?; let id = DeveloperDocumentId::decode(&args.id).context(ArgName("id"))?; Ok(id) })?; VirtualSchedulerModel::new(tx, component.into()) .cancel(virtual_id_v6) .await?; Ok(JsonValue::Null) } #[fastrace::trace] #[convex_macro::instrument_future] async fn insert(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct InsertArgs { table: String, value: JsonValue, } let (table, value) = with_argument_error("db.insert", || { let args: InsertArgs = serde_json::from_value(args)?; Ok(( args.table.parse().context(ArgName("table"))?, ConvexValue::try_from(args.value) .context(ArgName("value"))? .try_into() .context(ArgName("value"))?, )) })?; system_table_guard(&table, false)?; let component = provider.component()?; let tx = provider.tx()?; let document_id = UserFacingModel::new(tx, component.into()) .insert(table, value) .await?; let id_str = document_id.encode(); Ok(json!({ "_id": id_str })) } #[fastrace::trace] #[convex_macro::instrument_future] async fn shallow_merge(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct UpdateArgs { #[serde(default)] table: Option<String>, id: String, value: JsonValue, } let table_filter = provider.table_filter(); let component = provider.component()?; let tx = provider.tx()?; let (id, value, table_name) = with_argument_error("db.patch", || { let args: UpdateArgs = serde_json::from_value(args)?; let id = DeveloperDocumentId::decode(&args.id).context(ArgName("id"))?; let actual_table_name = tx .resolve_idv6(id, component.into(), table_filter) .context(ArgName("id"))?; check_table_name(&args.table, &actual_table_name)?; let value = PatchValue::try_from(args.value).context(ArgName("value"))?; Ok((id, value, actual_table_name)) })?; system_table_guard(&table_name, false)?; let document = UserFacingModel::new(tx, component.into()) .patch(id, value) .await?; Ok(document.to_internal_json()) } #[fastrace::trace] #[convex_macro::instrument_future] async fn replace(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct ReplaceArgs { #[serde(default)] table: Option<String>, id: String, value: JsonValue, } let table_filter = provider.table_filter(); let component = provider.component()?; let tx = provider.tx()?; let (id, value, table_name) = with_argument_error("db.replace", || { let args: ReplaceArgs = serde_json::from_value(args)?; let id = DeveloperDocumentId::decode(&args.id).context(ArgName("id"))?; let actual_table_name = tx .resolve_idv6(id, component.into(), table_filter) .context(ArgName("id"))?; check_table_name(&args.table, &actual_table_name)?; let value = ConvexValue::try_from(args.value).context(ArgName("value"))?; Ok(( id, value.try_into().context(ArgName("value"))?, actual_table_name, )) })?; system_table_guard(&table_name, false)?; let document = UserFacingModel::new(tx, component.into()) .replace(id, value) .await?; Ok(document.to_internal_json()) } #[fastrace::trace] #[convex_macro::instrument_future] async fn query_batch( provider: &mut P, batch_args: Vec<AsyncRead>, ) -> Vec<anyhow::Result<JsonValue>> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct GetArgs { #[serde(default)] table: Option<String>, id: String, #[serde(default)] is_system: bool, #[serde(default)] version: Option<String>, } #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct QueryStreamNextArgs { query_id: u32, } let table_filter = provider.table_filter(); let mut queries_to_fetch = BTreeMap::new(); let mut results = BTreeMap::new(); let batch_size = batch_args.len(); for (idx, args) in batch_args.into_iter().enumerate() { let result: anyhow::Result<_> = try { match args { AsyncRead::QueryStreamNext(args) => { let query_id = with_argument_error("queryStreamNext", || { let args: QueryStreamNextArgs = serde_json::from_value(args)?; Ok(args.query_id) })?; let managed_query = provider .take_query(query_id) .context(ErrorMetadata::bad_request( "QueryNotFound", "in-progress query not found", ))?; let local_query = match managed_query { ManagedQuery::Pending { query, version } => { let component = provider.component()?; DeveloperQuery::new_with_version( provider.tx()?, component.into(), query, version, table_filter, )? }, ManagedQuery::Active(local_query) => local_query, }; Some((Some(query_id), local_query)) }, AsyncRead::Get(args) => { let component = provider.component()?; let tx = provider.tx()?; let args = with_argument_error("db.get", || { Ok(serde_json::from_value::<GetArgs>(args)?) })?; let method_name = if args.is_system { "db.system.get" } else { "db.get" }; let (id, is_system, version) = with_argument_error(method_name, || { let id = DeveloperDocumentId::decode(&args.id).context(ArgName("id"))?; let version = parse_version(args.version)?; Ok((id, args.is_system, version)) })?; let name: Result<TableName, anyhow::Error> = tx .all_tables_number_to_name(component.into(), table_filter)( id.table() ); if name.is_ok() { system_table_guard(&name?, is_system)?; } match tx.resolve_idv6(id, component.into(), table_filter) { Ok(table_name) => { with_argument_error(method_name, || { check_table_name(&args.table, &table_name) })?; let query = Query::get(table_name, id); Some(( None, DeveloperQuery::new_with_version( tx, component.into(), query, version, table_filter, )?, )) }, Err(_) => { // Get on a non-existent table should return // null. None }, } }, } }; match result { Err(e) => { assert!(results.insert(idx, Err(e)).is_none()); }, Ok(Some((query_id, query_to_fetch))) => { assert!(queries_to_fetch .insert(idx, (query_id, query_to_fetch)) .is_none()); }, Ok(None) => { assert!(results.insert(idx, Ok(JsonValue::Null)).is_none()); }, } } let tx = match provider.tx() { Ok(tx) => tx, Err(e) => { return (0..batch_size).map(|_| Err(e.clone_error())).collect_vec(); }, }; let mut fetch_results = query_batch_next( queries_to_fetch .iter_mut() .map(|(idx, (_, local_query))| (*idx, (local_query, None))) .collect(), tx, ) .await; #[derive(Serialize)] struct QueryStreamNextResult { value: JsonValue, done: bool, } for (batch_key, (query_id, local_query)) in queries_to_fetch { let result: anyhow::Result<_> = try { if let Some(query_id) = query_id { provider.insert_query(query_id, local_query); } let maybe_next = fetch_results .remove(&batch_key) .context("batch_key missing")??; let done = maybe_next.is_none(); let value = match maybe_next { Some((doc, _)) => doc.into_value().0.into(), None => ConvexValue::Null, }; if let Some(query_id) = query_id { if done { provider.cleanup_query(query_id); } serde_json::to_value(QueryStreamNextResult { value: value.into(), done, })? } else { value.into() } }; results.insert(batch_key, result); } assert_eq!(results.len(), batch_size); results.into_values().collect() } #[fastrace::trace] #[convex_macro::instrument_future] async fn query_page(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { DatabaseSyscallsShared::query_page(provider, args).await } #[fastrace::trace] #[convex_macro::instrument_future] async fn remove(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct RemoveArgs { #[serde(default)] table: Option<String>, id: String, } let table_filter = provider.table_filter(); let component = provider.component()?; let tx = provider.tx()?; let (id, table_name) = with_argument_error("db.delete", || { let args: RemoveArgs = serde_json::from_value(args)?; let id = DeveloperDocumentId::decode(&args.id).context(ArgName("id"))?; let actual_table_name = tx .resolve_idv6(id, component.into(), table_filter) .context(ArgName("id"))?; check_table_name(&args.table, &actual_table_name)?; Ok((id, actual_table_name)) })?; system_table_guard(&table_name, false)?; let document = UserFacingModel::new(tx, component.into()) .delete(id) .await?; Ok(document.to_internal_json()) } #[convex_macro::instrument_future] async fn run_udf(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct RunUdfArgs { udf_type: String, name: Option<String>, reference: Option<String>, function_handle: Option<String>, args: JsonValue, } let RunUdfArgs { udf_type, name, reference, function_handle, args, } = with_argument_error("runUdf", || Ok(serde_json::from_value(args)?))?; let (udf_type, args) = with_argument_error("runUdf", || { let udf_type: UdfType = udf_type.parse().context(ArgName("udfType"))?; let args: ConvexObject = ConvexValue::try_from(args) .context(ArgName("args"))? .try_into() .context(ArgName("args"))?; Ok((udf_type, args)) })?; let path = match function_handle { Some(function_handle) => { let handle: FunctionHandle = with_argument_error("runUdf", || function_handle.parse())?; let path = provider.lookup_function_handle(handle).await?; let tx = provider.tx()?; let (_, component) = BootstrapComponentsModel::new(tx) .must_component_path_to_ids(&path.component)?; ResolvedComponentFunctionPath { component, udf_path: path.udf_path, component_path: Some(path.component), } }, None => { let reference = parse_name_or_reference("runUdf", name, reference)?; let resource = provider.resolve(reference).await?; match resource { Resource::ResolvedSystemUdf(path) => path, Resource::Value(_) => { anyhow::bail!(ErrorMetadata::bad_request( "InvalidResource", "Cannot execute a value resource" )); }, Resource::Function(path) => { let tx = provider.tx()?; let (_, component) = BootstrapComponentsModel::new(tx) .must_component_path_to_ids(&path.component)?; ResolvedComponentFunctionPath { component, udf_path: path.udf_path, component_path: Some(path.component), } }, } }, }; let value = provider.run_udf(udf_type, path, args).await?; Ok(value.into()) } async fn create_function_handle( provider: &mut P, args: JsonValue, ) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CreateFunctionHandleArgs { name: Option<String>, function_handle: Option<String>, reference: Option<String>, } let CreateFunctionHandleArgs { name, function_handle, reference, } = with_argument_error("createFunctionHandle", || Ok(serde_json::from_value(args)?))?; let function_path = match function_handle { Some(function_handle) => { return Ok(serde_json::to_value(function_handle)?); }, None => { let reference = parse_name_or_reference("createFunctionHandle", name, reference)?; match provider.resolve(reference).await? { Resource::Function(path) => path, Resource::ResolvedSystemUdf { .. } => { anyhow::bail!("Cannot create function handle for system UDF"); }, Resource::Value(_) => { anyhow::bail!(ErrorMetadata::bad_request( "InvalidResource", "Cannot create a function handle for a value resource" )); }, } }, }; let handle = provider.create_function_handle(function_path).await?; Ok(serde_json::to_value(String::from(handle))?) } } struct DatabaseSyscallsShared<RT: Runtime, P: AsyncSyscallProvider<RT>> { _pd: PhantomData<(RT, P)>, } /// As pages of results are commonly returned directly from UDFs, a page should /// be convertible to Value::Array, which has a size limit of MAX_ARRAY_LEN. /// If there are more results than 75% of that, we recommend splitting the page. const SOFT_MAX_PAGE_LEN: usize = soft_data_limit(8192); #[derive(Debug, Copy, Clone)] enum QueryPageStatus { SplitRequired, SplitRecommended, } impl QueryPageStatus { pub fn as_str(&self) -> &'static str { match self { Self::SplitRecommended => "SplitRecommended", Self::SplitRequired => "SplitRequired", } } } struct QueryPageMetadata { cursor: Cursor, split_cursor: Option<Cursor>, page_status: Option<QueryPageStatus>, } impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsShared<RT, P> { async fn read_page_from_query( mut query: DeveloperQuery<RT>, tx: &mut Transaction<RT>, page_size: usize, ) -> anyhow::Result<(Vec<DeveloperDocument>, QueryPageMetadata)> { let end_cursor = query.end_cursor(); let has_end_cursor = end_cursor.is_some(); let mut page = Vec::with_capacity(page_size); let mut page_status = None; // If we don't have an end cursor, collect results until we hit our page size. // If we do have an end cursor, ignore the page size and collect everything while has_end_cursor || (page.len() < page_size) { // If we don't have an end cursor, we really have no idea // how many results we need to prefetch, but we can // use the original page size as a hint. let prefetch_hint = if has_end_cursor { Some(page_size) } else { Some(page_size - page.len()) }; let next_value = match query.next(tx, prefetch_hint).await { Ok(Some(v)) => v, Ok(None) => { break; }, Err(e) => { if e.is_pagination_limit() { page_status = Some(QueryPageStatus::SplitRequired); if query.cursor().is_none() { // Intentionally drop ErrorMetadata because this should // be impossible, so we want to throw a system error instead. anyhow::bail!( "This should be impossible. Hit pagination limit before setting \ query cursor: {e:?}" ); } break; } anyhow::bail!(e); }, }; page.push(next_value) } if page_status.is_none() && (query.is_approaching_data_limit() || page.len() > SOFT_MAX_PAGE_LEN) { page_status = Some(QueryPageStatus::SplitRecommended); } let cursor = end_cursor.or_else(|| query.cursor()).context( "Cursor was None. This should be impossible if `.next` was called on the query.", )?; Ok(( page, QueryPageMetadata { cursor, split_cursor: query.split_cursor(), page_status, }, )) } #[fastrace::trace] async fn query_page(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct QueryPageArgs { query: JsonValue, cursor: Option<String>, end_cursor: Option<String>, page_size: usize, maximum_rows_read: Option<usize>, maximum_bytes_read: Option<usize>, #[serde(default)] version: Option<String>, } let args: QueryPageArgs = with_argument_error("queryPage", || Ok(serde_json::from_value(args)?))?; let parsed_query = with_argument_error("queryPage", || { Query::try_from(args.query).context(ArgName("query")) })?; let version = parse_version(args.version)?; let table_filter = provider.table_filter(); let page_size = args.page_size; if page_size == 0 { anyhow::bail!(ErrorMetadata::bad_request( "NoDocumentsForPagination", "Must request at least 1 document while paginating" )); } if page_size > *TRANSACTION_MAX_READ_SIZE_ROWS { anyhow::bail!(ErrorMetadata::bad_request( "PageSizeTooLarge", format!("Requested too many items: {page_size}") )); } if args.maximum_rows_read == Some(0) || args.maximum_bytes_read == Some(0) { anyhow::bail!(ErrorMetadata::bad_request( "InvalidPaginationLimit", "maximumRowsRead and maximumBytesRead must be greater than 0" )); } let start_cursor = args .cursor .map(|c| { provider .key_broker() .decrypt_cursor(c, provider.persistence_version()) }) .transpose()?; let end_cursor = match args.end_cursor { Some(end_cursor) => Some( provider .key_broker() .decrypt_cursor(end_cursor, provider.persistence_version())?, ), None => provider.prev_journal().end_cursor.clone(), }; let component = provider.component()?; if !component.is_root() && !provider.is_system() { anyhow::bail!(ErrorMetadata::bad_request( "PaginationUnsupportedInComponents", "paginate() is only supported in the app.", )); } let tx = provider.tx()?; let ( page, QueryPageMetadata { cursor, split_cursor, page_status, }, ) = { let query = DeveloperQuery::new_bounded( tx, component.into(), parsed_query, PaginationOptions::ReactivePagination { start_cursor, end_cursor, maximum_rows_read: args.maximum_rows_read, maximum_bytes_read: args.maximum_bytes_read, }, version, table_filter, )?; let (page, metadata) = Self::read_page_from_query(query, tx, page_size).await?; let page = page.into_iter().map(|doc| doc.to_internal_json()).collect(); (page, metadata) }; let page_status = page_status.map(|s| s.as_str()); // Place split_cursor in the middle. let split_cursor = split_cursor.map(|split| { provider .key_broker() .encrypt_cursor(&split, provider.persistence_version()) }); let continue_cursor = provider .key_broker() .encrypt_cursor(&cursor, provider.persistence_version()); let is_done = matches!( cursor, Cursor { position: CursorPosition::End, .. } ); anyhow::ensure!( provider.next_journal().end_cursor.is_none(), ErrorMetadata::bad_request( "MultiplePaginatedDatabaseQueries", "This query or mutation function ran multiple paginated queries. Convex only \ supports a single paginated query in each function.", ) ); provider.next_journal().end_cursor = Some(cursor); #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct QueryPageResult { page: Vec<JsonValue>, is_done: bool, continue_cursor: String, split_cursor: Option<String>, page_status: Option<&'static str>, } let result = QueryPageResult { page, is_done, continue_cursor, split_cursor, page_status, }; Ok(serde_json::to_value(result)?) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server