Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs77.1 kB
use core::sync::atomic::Ordering; use std::{ collections::{ BTreeMap, BTreeSet, }, sync::{ atomic::AtomicUsize, Arc, LazyLock, }, time::Duration, }; use anyhow::Context; use async_trait::async_trait; use authentication::token_to_authorization_header; use common::{ auth::AuthConfig, backoff::Backoff, bootstrap_model::components::{ definition::ComponentDefinitionMetadata, handles::FunctionHandle, }, components::{ CanonicalizedComponentFunctionPath, CanonicalizedComponentModulePath, ComponentDefinitionPath, ComponentId, ComponentName, ComponentPath, PublicFunctionPath, Resource, }, errors::JsError, execution_context::ExecutionContext, fastrace_helpers::EncodedSpan, knobs::{ APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT, APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS, APPLICATION_MAX_CONCURRENT_MUTATIONS, APPLICATION_MAX_CONCURRENT_NODE_ACTIONS, APPLICATION_MAX_CONCURRENT_QUERIES, APPLICATION_MAX_CONCURRENT_V8_ACTIONS, DEFAULT_APPLICATION_MAX_FUNCTION_CONCURRENCY, ISOLATE_MAX_USER_HEAP_SIZE, UDF_EXECUTOR_OCC_INITIAL_BACKOFF, UDF_EXECUTOR_OCC_MAX_BACKOFF, UDF_EXECUTOR_OCC_MAX_RETRIES, }, log_lines::{ run_function_and_collect_log_lines, LogLine, LogLines, }, query_journal::QueryJournal, runtime::{ Runtime, UnixTimestamp, }, schemas::DatabaseSchema, tokio::sync::{ Semaphore, SemaphorePermit, }, types::{ AllowedVisibility, FunctionCaller, ModuleEnvironment, NodeDependency, Timestamp, UdfType, }, value::ConvexArray, RequestId, }; use database::{ unauthorized_error, Database, Token, Transaction, }; use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; use file_storage::TransactionalFileStorage; use function_runner::{ server::{ FunctionMetadata, HttpActionMetadata, }, FunctionReads, FunctionRunner, FunctionWrites, }; use futures::{ select_biased, FutureExt, }; use isolate::ActionCallbacks; use keybroker::{ Identity, KeyBroker, }; use model::{ backend_state::BackendStateModel, components::handles::FunctionHandlesModel, config::{ module_loader::ModuleLoader, types::ModuleConfig, }, environment_variables::{ types::{ EnvVarName, EnvVarValue, }, EnvironmentVariablesModel, }, external_packages::{ types::ExternalDepsPackage, ExternalPackagesModel, }, file_storage::{ types::FileStorageEntry, FileStorageId, }, modules::{ module_versions::{ AnalyzedModule, ModuleSource, SourceMap, }, ModuleModel, }, scheduled_jobs::VirtualSchedulerModel, session_requests::{ types::{ SessionRequestIdentifier, SessionRequestOutcome, SessionRequestRecord, }, SessionRequestModel, }, source_packages::{ types::SourcePackage, SourcePackageModel, }, udf_config::types::UdfConfig, }; use node_executor::{ Actions, AnalyzeRequest, BuildDepsRequest, ExecuteRequest, }; use serde_json::Value as JsonValue; use storage::Storage; use sync_types::{ types::SerializedArgs, CanonicalizedModulePath, }; use tokio::sync::mpsc; use udf::{ environment::system_env_vars, helpers::parse_udf_args, validation::{ validate_schedule_args, ValidatedActionOutcome, ValidatedPathAndArgs, ValidatedUdfOutcome, }, ActionOutcome, EvaluateAppDefinitionsResult, FunctionOutcome, FunctionResult, HttpActionOutcome, UdfOutcome, }; use usage_tracking::{ FunctionUsageStats, FunctionUsageTracker, OccInfo, }; use value::{ id_v6::DeveloperDocumentId, identifier::Identifier, JsonPackedValue, TableNamespace, }; use vector::{ PublicVectorSearchQueryResult, VectorSearch, }; use self::metrics::{ function_waiter_timer, log_occ_retries, log_outstanding_functions, log_udf_executor_result, mutation_timer, OutstandingFunctionState, UdfExecutorResult, }; use crate::{ application_function_runner::metrics::{ function_run_timer, function_total_timer, log_function_wait_timeout, log_mutation_already_committed, }, cache::{ CacheManager, QueryCache, }, function_log::{ ActionCompletion, FunctionExecutionLog, }, ActionError, ActionReturn, MutationError, MutationReturn, QueryReturn, }; mod http_routing; mod metrics; static BUILD_DEPS_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| Duration::from_secs(1200)); /// Wrapper for [IsolateClient]s and [FunctionRunner]s that determines where to /// route requests. #[derive(Clone)] pub struct FunctionRouter<RT: Runtime> { pub(crate) function_runner: Arc<dyn FunctionRunner<RT>>, query_limiter: Arc<Limiter>, mutation_limiter: Arc<Limiter>, action_limiter: Arc<Limiter>, http_action_limiter: Arc<Limiter>, rt: RT, database: Database<RT>, default_system_env_vars: BTreeMap<EnvVarName, EnvVarValue>, } impl<RT: Runtime> FunctionRouter<RT> { pub fn new( function_runner: Arc<dyn FunctionRunner<RT>>, rt: RT, database: Database<RT>, default_system_env_vars: BTreeMap<EnvVarName, EnvVarValue>, ) -> Self { Self { function_runner, rt, database, default_system_env_vars, query_limiter: Arc::new(Limiter::new( ModuleEnvironment::Isolate, UdfType::Query, *APPLICATION_MAX_CONCURRENT_QUERIES, )), mutation_limiter: Arc::new(Limiter::new( ModuleEnvironment::Isolate, UdfType::Mutation, *APPLICATION_MAX_CONCURRENT_MUTATIONS, )), action_limiter: Arc::new(Limiter::new( ModuleEnvironment::Isolate, UdfType::Action, *APPLICATION_MAX_CONCURRENT_V8_ACTIONS, )), http_action_limiter: Arc::new(Limiter::new( ModuleEnvironment::Isolate, UdfType::HttpAction, *APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS, )), } } } impl<RT: Runtime> FunctionRouter<RT> { #[fastrace::trace] pub(crate) async fn execute_query_or_mutation( &self, tx: Transaction<RT>, path_and_args: ValidatedPathAndArgs, udf_type: UdfType, journal: QueryJournal, context: ExecutionContext, ) -> anyhow::Result<(Transaction<RT>, FunctionOutcome)> { anyhow::ensure!(udf_type == UdfType::Query || udf_type == UdfType::Mutation); // All queries and mutations are run in the isolate environment. let timer = function_total_timer(ModuleEnvironment::Isolate, udf_type); let (tx, outcome) = self .function_runner_execute( tx, udf_type, context, None, Some(FunctionMetadata { journal, path_and_args, }), None, ) .await?; let tx = tx.with_context(|| format!("Missing transaction in response for {udf_type}"))?; timer.finish(); Ok((tx, outcome)) } #[fastrace::trace] pub(crate) async fn execute_action( &self, tx: Transaction<RT>, path_and_args: ValidatedPathAndArgs, log_line_sender: mpsc::UnboundedSender<LogLine>, context: ExecutionContext, ) -> anyhow::Result<ActionOutcome> { let (_, outcome) = self .function_runner_execute( tx, UdfType::Action, context, Some(log_line_sender), Some(FunctionMetadata { journal: QueryJournal::new(), path_and_args, }), None, ) .await?; let FunctionOutcome::Action(outcome) = outcome else { anyhow::bail!("Calling an action returned an invalid outcome") }; Ok(outcome) } #[fastrace::trace] pub(crate) async fn execute_http_action( &self, tx: Transaction<RT>, log_line_sender: mpsc::UnboundedSender<LogLine>, http_action_metadata: HttpActionMetadata, context: ExecutionContext, ) -> anyhow::Result<HttpActionOutcome> { // All http actions are run in the isolate environment. let timer = function_total_timer(ModuleEnvironment::Isolate, UdfType::HttpAction); let (_, outcome) = self .function_runner_execute( tx, UdfType::HttpAction, context, Some(log_line_sender), None, Some(http_action_metadata), ) .await?; let FunctionOutcome::HttpAction(outcome) = outcome else { anyhow::bail!("Calling an http action returned an invalid outcome") }; timer.finish(); Ok(outcome) } // Execute using the function runner. Can be used for v8 udfs other than http // actions. #[fastrace::trace] async fn function_runner_execute( &self, mut tx: Transaction<RT>, udf_type: UdfType, context: ExecutionContext, log_line_sender: Option<mpsc::UnboundedSender<LogLine>>, function_metadata: Option<FunctionMetadata>, http_action_metadata: Option<HttpActionMetadata>, ) -> anyhow::Result<(Option<Transaction<RT>>, FunctionOutcome)> { let in_memory_index_last_modified = self .database .snapshot(tx.begin_timestamp())? .in_memory_indexes .in_memory_indexes_last_modified(); let limiter = match udf_type { UdfType::Query => &self.query_limiter, UdfType::Mutation => &self.mutation_limiter, UdfType::Action => &self.action_limiter, UdfType::HttpAction => &self.http_action_limiter, }; let request_guard = limiter.acquire_permit_with_timeout(&self.rt).await?; let timer = function_run_timer(udf_type); let (function_tx, outcome, usage_stats) = self .function_runner .run_function( udf_type, tx.identity().clone(), tx.begin_timestamp(), tx.writes().as_flat()?.clone().into(), log_line_sender, function_metadata, http_action_metadata, self.default_system_env_vars.clone(), in_memory_index_last_modified, context, ) .await?; timer.finish(); drop(request_guard); // Add the usage stats to the current transaction tracker. tx.usage_tracker.add(usage_stats); // Apply the reads and writes to the current transaction let tx = if let Some(function_tx) = function_tx { let FunctionReads { reads, num_intervals, user_tx_size, system_tx_size, } = function_tx.reads; let FunctionWrites { updates } = function_tx.writes; tx.apply_function_runner_tx( function_tx.begin_timestamp, reads, num_intervals, user_tx_size, system_tx_size, updates, function_tx.rows_read_by_tablet, )?; Some(tx) } else { None }; Ok((tx, outcome)) } } // Used to limit upstream concurrency for a given function type. It also tracks // and log gauges for the number of waiting and currently running functions. struct Limiter { udf_type: UdfType, env: ModuleEnvironment, // Used to limit running functions. semaphore: Semaphore, total_permits: usize, // Total function requests, including ones still waiting on the semaphore. total_outstanding: AtomicUsize, } impl Limiter { fn new(env: ModuleEnvironment, udf_type: UdfType, total_permits: usize) -> Self { let limiter = Self { udf_type, env, semaphore: Semaphore::new(total_permits), total_permits, total_outstanding: AtomicUsize::new(0), }; // Update the gauges on startup. limiter.update_gauges(); limiter } #[fastrace::trace] async fn acquire_permit_with_timeout<'a, RT: Runtime>( &'a self, rt: &'a RT, ) -> anyhow::Result<RequestGuard<'a>> { let mut request_guard = self.start(); select_biased! { _ = request_guard.acquire_permit().fuse() => {}, _ = rt.wait(*APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT) => { log_function_wait_timeout(self.env, self.udf_type); anyhow::bail!(ErrorMetadata::rate_limited( "TooManyConcurrentRequests", format!( "Too many concurrent requests. Your backend is limited to {} concurrent {}s. {}", self.total_permits, self.udf_type.to_lowercase_string(), if self.total_permits > DEFAULT_APPLICATION_MAX_FUNCTION_CONCURRENCY { "If you need more resources, please contact support." } else { "To get more resources, upgrade to Convex Pro. If you are already on Convex Pro, please contact support." } ), )); }, } Ok(request_guard) } fn start(&self) -> RequestGuard<'_> { self.total_outstanding.fetch_add(1, Ordering::SeqCst); // Update the gauge to account for the newly waiting request. self.update_gauges(); RequestGuard { limiter: self, permit: None, } } // Updates the current waiting and running function gauges. fn update_gauges(&self) { let running = self.total_permits - self.semaphore.available_permits(); let waiting = self .total_outstanding .load(Ordering::SeqCst) .saturating_sub(running); log_outstanding_functions( running, self.env, self.udf_type, OutstandingFunctionState::Running, ); log_outstanding_functions( waiting, self.env, self.udf_type, OutstandingFunctionState::Waiting, ); } } // Wraps a request to guarantee we correctly update the waiting and running // gauges even if dropped. struct RequestGuard<'a> { limiter: &'a Limiter, permit: Option<SemaphorePermit<'a>>, } impl RequestGuard<'_> { async fn acquire_permit(&mut self) -> anyhow::Result<()> { let timer = function_waiter_timer(self.limiter.udf_type); assert!( self.permit.is_none(), "Called `acquire_permit` more than once" ); self.permit = Some(self.limiter.semaphore.acquire().await?); timer.finish(); // Update the gauge to account for the newly running function. self.limiter.update_gauges(); Ok(()) } } impl Drop for RequestGuard<'_> { fn drop(&mut self) { // Drop the semaphore permit before updating gauges. drop(self.permit.take()); // Remove the request from the running ones. self.limiter .total_outstanding .fetch_sub(1, Ordering::SeqCst); // Update the gauges to account fo the newly finished request. self.limiter.update_gauges(); } } /// Executes UDFs for backends. /// /// This struct directly executes http and node actions. Queries, Mutations and /// v8 Actions are instead routed through the FunctionRouter and its /// FunctionRunner implementation. pub struct ApplicationFunctionRunner<RT: Runtime> { runtime: RT, pub(crate) database: Database<RT>, key_broker: KeyBroker, isolate_functions: FunctionRouter<RT>, // Used for analyze, schema, etc. node_actions: Actions<RT>, pub(crate) module_cache: Arc<dyn ModuleLoader<RT>>, modules_storage: Arc<dyn Storage>, file_storage: TransactionalFileStorage<RT>, function_log: FunctionExecutionLog<RT>, cache_manager: CacheManager<RT>, default_system_env_vars: BTreeMap<EnvVarName, EnvVarValue>, node_action_limiter: Limiter, } impl<RT: Runtime> ApplicationFunctionRunner<RT> { pub fn new( runtime: RT, database: Database<RT>, key_broker: KeyBroker, function_runner: Arc<dyn FunctionRunner<RT>>, node_actions: Actions<RT>, file_storage: TransactionalFileStorage<RT>, modules_storage: Arc<dyn Storage>, module_cache: Arc<dyn ModuleLoader<RT>>, function_log: FunctionExecutionLog<RT>, default_system_env_vars: BTreeMap<EnvVarName, EnvVarValue>, cache: QueryCache, ) -> Self { let isolate_functions = FunctionRouter::new( function_runner, runtime.clone(), database.clone(), default_system_env_vars.clone(), ); let cache_manager = CacheManager::new( runtime.clone(), database.clone(), isolate_functions.clone(), function_log.clone(), cache, ); Self { runtime, database, key_broker, isolate_functions, node_actions, module_cache, modules_storage, file_storage, function_log, cache_manager, default_system_env_vars, node_action_limiter: Limiter::new( ModuleEnvironment::Node, UdfType::Action, *APPLICATION_MAX_CONCURRENT_NODE_ACTIONS, ), } } pub(crate) async fn shutdown(&self) -> anyhow::Result<()> { self.node_actions.shutdown(); Ok(()) } // Only used for running queries from REPLs. pub async fn run_query_without_caching( &self, request_id: RequestId, mut tx: Transaction<RT>, path: CanonicalizedComponentFunctionPath, arguments: ConvexArray, caller: FunctionCaller, ) -> anyhow::Result<(Result<JsonPackedValue, JsError>, LogLines)> { if !(tx.identity().is_admin() || tx.identity().is_system()) { anyhow::bail!(unauthorized_error("query_without_caching")); } let identity = tx.inert_identity(); let start = self.runtime.monotonic_now(); let validate_result = ValidatedPathAndArgs::new( caller.allowed_visibility(), &mut tx, PublicFunctionPath::Component(path.clone()), arguments.clone(), UdfType::Query, ) .await?; let context = ExecutionContext::new(request_id, &caller); let (mut tx, outcome) = match validate_result { Ok(path_and_args) => { self.isolate_functions .execute_query_or_mutation( tx, path_and_args, UdfType::Query, QueryJournal::new(), context.clone(), ) .await? }, Err(js_err) => { let query_outcome = UdfOutcome::from_error( js_err, path.clone(), arguments.clone(), identity.clone(), self.runtime.clone(), None, )?; (tx, FunctionOutcome::Query(query_outcome)) }, }; let outcome = match outcome { FunctionOutcome::Query(o) => o, _ => anyhow::bail!("Received non-query outcome for query"), }; let stats = tx.take_stats(); let result = outcome.result.clone(); let log_lines = outcome.log_lines.clone(); self.function_log .log_query( &outcome, stats, false, start.elapsed(), caller, tx.usage_tracker, context, ) .await; Ok((result, log_lines)) } /// Runs a mutations and retries on OCC errors. #[fastrace::trace] pub async fn retry_mutation( &self, request_id: RequestId, path: PublicFunctionPath, arguments: Vec<JsonValue>, identity: Identity, mutation_identifier: Option<SessionRequestIdentifier>, caller: FunctionCaller, mutation_queue_length: Option<usize>, ) -> anyhow::Result<Result<MutationReturn, MutationError>> { let timer = mutation_timer(); let result = self ._retry_mutation( request_id, path, arguments, identity, mutation_identifier, caller, mutation_queue_length, ) .await; match &result { Ok(_) => timer.finish(), Err(e) => timer.finish_with(e.metric_status_label_value()), }; result } /// Runs a mutations and retries on OCC errors. #[fastrace::trace] async fn _retry_mutation( &self, request_id: RequestId, path: PublicFunctionPath, arguments: Vec<JsonValue>, identity: Identity, mutation_identifier: Option<SessionRequestIdentifier>, caller: FunctionCaller, mutation_queue_length: Option<usize>, ) -> anyhow::Result<Result<MutationReturn, MutationError>> { if path.is_system() && !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("mutation")); } let arguments = match parse_udf_args(path.udf_path(), arguments) { Ok(arguments) => arguments, Err(error) => { return Ok(Err(MutationError { error, log_lines: vec![].into(), })) }, }; let udf_path_string = (!path.is_system()).then_some(path.udf_path().to_string()); let mut backoff = Backoff::new( *UDF_EXECUTOR_OCC_INITIAL_BACKOFF, *UDF_EXECUTOR_OCC_MAX_BACKOFF, ); loop { let mutation_retry_count = backoff.failures() as usize; let usage_tracker = FunctionUsageTracker::new(); // Note that we use different context for every mutation attempt. // This so every JS function run gets a different executionId. let context = ExecutionContext::new(request_id.clone(), &caller); let start = self.runtime.monotonic_now(); let mut tx = self .database .begin_with_usage(identity.clone(), usage_tracker.clone()) .await?; let pause_client = self.runtime.pause_client(); pause_client.wait("retry_mutation_loop_start").await; let identity = tx.inert_identity(); // Return the previous execution's result if the mutation was committed already. if let Some(result) = self .check_mutation_status(&mut tx, &mutation_identifier) .await? { return Ok(result); } let result: Result<(Transaction<RT>, ValidatedUdfOutcome), anyhow::Error> = self .run_mutation_no_udf_log( tx, path.clone(), arguments.clone(), caller.allowed_visibility(), context.clone(), mutation_queue_length, ) .await; let (mut tx, mut outcome) = match result { Ok(r) => r, Err(e) => { self.function_log .log_mutation_system_error( &e, path.debug_into_component_path(), arguments, identity, start, caller, context.clone(), mutation_queue_length, mutation_retry_count, ) .await?; return Err(e); }, }; // Save a CommittedMutation object so we won't rerun this mutation if // successful. self.write_mutation_status(&mut tx, &mutation_identifier, &outcome) .await?; let stats = tx.take_stats(); let execution_time = start.elapsed(); let log_lines = outcome.log_lines.clone(); let value = match outcome.result { Ok(ref value) => value.clone(), // If it's an error inside the UDF, log the failed execution and return the // developer error. Err(ref error) => { drop(tx); self.function_log .log_mutation( outcome.clone(), stats, execution_time, caller, usage_tracker, context.clone(), mutation_queue_length, mutation_retry_count, ) .await; return Ok(Err(MutationError { error: error.to_owned(), log_lines, })); }, }; // Attempt to commit the transaction and log an error if commit failed, // even if it was an OCC error. We may decide later to suppress OCC // errors from the log. let result = match self .database .commit_with_write_source(tx, udf_path_string.clone()) .await { Ok(ts) => Ok(MutationReturn { value, log_lines, ts, }), Err(e) => { if e.is_deterministic_user_error() { let js_error = JsError::from_error(e); outcome.result = Err(js_error.clone()); Err(MutationError { error: js_error, log_lines, }) } else { if e.is_occ() && (backoff.failures() as usize) < *UDF_EXECUTOR_OCC_MAX_RETRIES { let sleep = backoff.fail(&mut self.runtime.rng()); tracing::warn!( "Optimistic concurrency control failed ({e}), retrying \ {udf_path_string:?} after {sleep:?}", ); self.runtime.wait(sleep).await; let (table_name, document_id, write_source) = e.occ_info().unwrap_or((None, None, None)); self.function_log .log_mutation_occ_error( outcome, stats, execution_time, caller.clone(), usage_tracker, context.clone(), OccInfo { table_name, document_id, write_source, retry_count: mutation_retry_count as u64, }, mutation_queue_length, mutation_retry_count, ) .await; continue; } outcome.result = Err(JsError::from_error_ref(&e)); if e.is_occ() { let (table_name, document_id, write_source) = e.occ_info().unwrap_or((None, None, None)); self.function_log .log_mutation_occ_error( outcome, stats, execution_time, caller, usage_tracker, context.clone(), OccInfo { table_name, document_id, write_source, retry_count: mutation_retry_count as u64, }, mutation_queue_length, mutation_retry_count, ) .await; } else { self.function_log .log_mutation_system_error( &e, path.debug_into_component_path(), arguments, identity, start, caller, context, mutation_queue_length, mutation_retry_count, ) .await?; } log_occ_retries(backoff.failures() as usize); return Err(e); } }, }; self.function_log .log_mutation( outcome.clone(), stats, execution_time, caller, usage_tracker, context.clone(), mutation_queue_length, mutation_retry_count, ) .await; log_occ_retries(backoff.failures() as usize); return Ok(result); } } /// Attempts to run a mutation once using the given transaction. /// The method is not idempotent. It is the caller responsibility to /// drive retries as we as log in the UDF log. pub async fn run_mutation_no_udf_log( &self, tx: Transaction<RT>, path: PublicFunctionPath, arguments: ConvexArray, allowed_visibility: AllowedVisibility, context: ExecutionContext, mutation_queue_length: Option<usize>, ) -> anyhow::Result<(Transaction<RT>, ValidatedUdfOutcome)> { let result = self .run_mutation_inner( tx, path, arguments, allowed_visibility, context, mutation_queue_length, ) .await; match result.as_ref() { Ok((_, udf_outcome)) => { let result = if udf_outcome.result.is_ok() { UdfExecutorResult::Success } else { UdfExecutorResult::UserError }; log_udf_executor_result(UdfType::Mutation, result); }, Err(e) => { log_udf_executor_result( UdfType::Mutation, UdfExecutorResult::SystemError(e.metric_status_label_value()), ); }, }; result } /// Runs the mutation once without any logging. #[fastrace::trace] async fn run_mutation_inner( &self, mut tx: Transaction<RT>, path: PublicFunctionPath, arguments: ConvexArray, allowed_visibility: AllowedVisibility, context: ExecutionContext, mutation_queue_length: Option<usize>, ) -> anyhow::Result<(Transaction<RT>, ValidatedUdfOutcome)> { if path.is_system() && !(tx.identity().is_admin() || tx.identity().is_system()) { anyhow::bail!(unauthorized_error("mutation")); } let identity = tx.inert_identity(); let validate_result = ValidatedPathAndArgs::new_with_returns_validator( allowed_visibility, &mut tx, path.clone(), arguments.clone(), UdfType::Mutation, ) .await?; let (path_and_args, returns_validator) = match validate_result { Ok(tuple) => tuple, Err(js_err) => { let mutation_outcome = ValidatedUdfOutcome::from_error( js_err, path.debug_into_component_path(), arguments.clone(), identity.clone(), self.runtime.clone(), None, )?; return Ok((tx, mutation_outcome)); }, }; let path = path_and_args.path().clone(); let (mut tx, outcome) = self .isolate_functions .execute_query_or_mutation( tx, path_and_args, UdfType::Mutation, QueryJournal::new(), context, ) .await?; let mutation_outcome = match outcome { FunctionOutcome::Mutation(o) => o, _ => anyhow::bail!("Received non-mutation outcome for mutation"), }; let component = path.component; let table_mapping = tx.table_mapping().namespace(component.into()); let outcome = ValidatedUdfOutcome::new( mutation_outcome, returns_validator, &table_mapping, mutation_queue_length, ); Ok((tx, outcome)) } #[fastrace::trace] pub async fn run_action( &self, request_id: RequestId, path: PublicFunctionPath, arguments: Vec<JsonValue>, identity: Identity, caller: FunctionCaller, ) -> anyhow::Result<Result<ActionReturn, ActionError>> { if path.is_system() && !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("action")); } let arguments = match parse_udf_args(path.udf_path(), arguments) { Ok(arguments) => arguments, Err(error) => { return Ok(Err(ActionError { error, log_lines: vec![].into(), })) }, }; let context = ExecutionContext::new(request_id.clone(), &caller); let usage_tracking = FunctionUsageTracker::new(); let start = self.runtime.monotonic_now(); let completion_result = self .run_action_no_udf_log( path.clone(), arguments.clone(), identity.clone(), caller.clone(), usage_tracking.clone(), context.clone(), ) .await; let completion = match completion_result { Ok(c) => c, Err(e) => { self.function_log .log_action_system_error( &e, path.debug_into_component_path(), arguments, identity.into(), start, caller, vec![].into(), context, ) .await?; anyhow::bail!(e) }, }; let log_lines = completion.log_lines().clone(); let result = completion.outcome.result.clone(); self.function_log .log_action(completion, usage_tracking) .await; let value = match result { Ok(value) => value, // If it's an error inside the UDF, log the failed execution and return the // developer error. Err(error) => return Ok(Err(ActionError { error, log_lines })), }; Ok(Ok(ActionReturn { value, log_lines })) } /// Runs the actions without logging to the UDF log. It is the caller /// responsibility to log to the UDF log. #[fastrace::trace] pub async fn run_action_no_udf_log( &self, path: PublicFunctionPath, arguments: ConvexArray, identity: Identity, caller: FunctionCaller, usage_tracking: FunctionUsageTracker, context: ExecutionContext, ) -> anyhow::Result<ActionCompletion> { let result = self .run_action_inner(path, arguments, identity, caller, usage_tracking, context) .await; match result.as_ref() { Ok(completion) => { let result = if completion.outcome.result.is_ok() { UdfExecutorResult::Success } else { UdfExecutorResult::UserError }; log_udf_executor_result(UdfType::Action, result); }, Err(e) => { log_udf_executor_result( UdfType::Action, UdfExecutorResult::SystemError(e.metric_status_label_value()), ); }, }; result } /// Runs the action without any logging. #[fastrace::trace] async fn run_action_inner( &self, path: PublicFunctionPath, arguments: ConvexArray, identity: Identity, caller: FunctionCaller, usage_tracking: FunctionUsageTracker, context: ExecutionContext, ) -> anyhow::Result<ActionCompletion> { if path.is_system() && !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("action")); } let unix_timestamp = self.runtime.unix_timestamp(); let start = self.runtime.monotonic_now(); let mut tx = self .database .begin_with_usage(identity.clone(), usage_tracking) .await?; let validate_result = ValidatedPathAndArgs::new_with_returns_validator( caller.allowed_visibility(), &mut tx, path.clone(), arguments.clone(), UdfType::Action, ) .await?; // Fetch the returns_validator now to be used at a later ts. let (path_and_args, returns_validator) = match validate_result { Ok((path_and_args, returns_validator)) => (path_and_args, returns_validator), Err(js_error) => { return Ok(ActionCompletion { outcome: ValidatedActionOutcome::from_error( js_error, path.debug_into_component_path(), arguments, identity.into(), self.runtime.clone(), None, ), environment: ModuleEnvironment::Invalid, memory_in_mb: 0, execution_time: Duration::from_secs(0), context, unix_timestamp, caller, log_lines: vec![].into(), }); }, }; let component = path_and_args.path().component; // We should use table mappings from the same transaction as the output // validator was retrieved. let table_mapping = tx.table_mapping().namespace(component.into()); let virtual_system_mapping = tx.virtual_system_mapping().clone(); let udf_server_version = path_and_args.npm_version().clone(); // We should not be missing the module given we validated the path above // which requires the module to exist. let path = path_and_args.path().clone(); let module = ModuleModel::new(&mut tx) .get_metadata_for_function_by_id(&path) .await? .context("Missing a valid module")?; let (log_line_sender, log_line_receiver) = mpsc::unbounded_channel(); let inert_identity = tx.inert_identity(); let timer = function_total_timer(module.environment, UdfType::Action); let completion_result = match module.environment { ModuleEnvironment::Isolate => { // TODO: This is the only use case of clone. We should get rid of clone, // when we deprecate that codepath. let outcome_future = self .isolate_functions .execute_action(tx, path_and_args, log_line_sender, context.clone()) .boxed(); let (outcome_result, log_lines) = run_function_and_collect_log_lines( outcome_future, log_line_receiver, |log_line| { self.function_log.log_action_progress( path.clone().for_logging(), unix_timestamp, context.clone(), vec![log_line].into(), module.environment, ) }, ) .await; let memory_in_mb: u64 = (*ISOLATE_MAX_USER_HEAP_SIZE / (1 << 20)) .try_into() .unwrap(); let validated_outcome_result = outcome_result.map(|outcome| { ValidatedActionOutcome::new(outcome, returns_validator, &table_mapping) }); timer.finish(); validated_outcome_result.map(|outcome| ActionCompletion { outcome, execution_time: start.elapsed(), environment: ModuleEnvironment::Isolate, memory_in_mb, context: context.clone(), unix_timestamp, caller: caller.clone(), log_lines, }) }, ModuleEnvironment::Node => { // We should not be missing the module given we validated the path above // which requires the module to exist. let module_path = CanonicalizedComponentModulePath { component: path.component, module_path: path.udf_path.module().clone(), }; let component = path.component; let module_metadata = match ModuleModel::new(&mut tx) .get_metadata(module_path.clone()) .await? { Some(r) => r, None => anyhow::bail!("Missing a valid module_version"), }; let source_package = SourcePackageModel::new(&mut tx, component.into()) .get(module_metadata.source_package_id) .await?; let source_maps_callback = async { let module_version = self .module_cache .get_module_with_metadata(module_metadata, source_package) .await?; let mut source_maps = BTreeMap::new(); if let Some(source_map) = module_version.source_map.clone() { source_maps.insert(module_path.module_path.clone(), source_map); } Ok(source_maps) }; let _request_guard = self .node_action_limiter .acquire_permit_with_timeout(&self.runtime) .await?; let source_package_id = module.source_package_id; let source_package = SourcePackageModel::new(&mut tx, component.into()) .get(source_package_id) .await? .into_value(); let mut environment_variables = system_env_vars(&mut tx, self.default_system_env_vars.clone()).await?; let user_environment_variables = EnvironmentVariablesModel::new(&mut tx).get_all().await?; environment_variables.extend(user_environment_variables); // Fetch source and external_deps presigned URI first let source_uri_future = self .modules_storage .signed_url(source_package.storage_key.clone(), Duration::from_secs(60)); let (source_uri, external_deps_package) = if let Some(external_deps_package_id) = source_package.external_deps_package_id { let pkg = ExternalPackagesModel::new(&mut tx) .get(external_deps_package_id) .await? .into_value(); let external_uri_future = self .modules_storage .signed_url(pkg.storage_key.clone(), Duration::from_secs(60)); let (source_uri, external_deps_uri) = tokio::try_join!(source_uri_future, external_uri_future)?; ( source_uri, Some(node_executor::Package { uri: external_deps_uri, key: pkg.storage_key, sha256: pkg.sha256, }), ) } else { (source_uri_future.await?, None) }; let udf_server_version = path_and_args.npm_version().clone(); let request = ExecuteRequest { path_and_args, source_package: node_executor::SourcePackage { bundled_source: node_executor::Package { uri: source_uri, key: source_package.storage_key, sha256: source_package.sha256, }, external_deps: external_deps_package, }, source_package_id, user_identity: tx.user_identity(), auth_header: token_to_authorization_header(tx.authentication_token())?, environment_variables, callback_token: self.key_broker.issue_action_token(path.component), context: context.clone(), encoded_parent_trace: EncodedSpan::from_parent().0, }; let node_outcome_future = self .node_actions .execute(request, log_line_sender, source_maps_callback) .boxed(); let (mut node_outcome_result, log_lines) = run_function_and_collect_log_lines( node_outcome_future, log_line_receiver, |log_line| { self.function_log.log_action_progress( path.clone().for_logging(), unix_timestamp, context.clone(), vec![log_line].into(), module.environment, ) }, ) .await; timer.finish(); if let Ok(ref mut node_outcome) = node_outcome_result { if let Ok(ref output) = node_outcome.result { if let Some(js_err) = returns_validator.check_output( output, &table_mapping, &virtual_system_mapping, ) { node_outcome.result = Err(js_err); } } } node_outcome_result.map(|node_outcome| { let outcome = ActionOutcome { path: path.clone().for_logging(), arguments: arguments.clone(), identity: tx.inert_identity(), unix_timestamp, result: node_outcome.result.map(JsonPackedValue::pack), syscall_trace: node_outcome.syscall_trace, udf_server_version, }; let outcome = ValidatedActionOutcome::new(outcome, returns_validator, &table_mapping); ActionCompletion { outcome, execution_time: start.elapsed(), environment: ModuleEnvironment::Node, memory_in_mb: node_outcome.memory_used_in_mb, context: context.clone(), unix_timestamp, caller: caller.clone(), log_lines, } }) }, ModuleEnvironment::Invalid => { Err(anyhow::anyhow!("Attempting to run an invalid function")) }, }; match completion_result { Ok(c) => Ok(c), Err(e) if e.is_deterministic_user_error() => { let outcome = ValidatedActionOutcome::from_error( JsError::from_error(e), path.for_logging(), arguments, inert_identity, self.runtime.clone(), udf_server_version, ); Ok(ActionCompletion { outcome, execution_time: start.elapsed(), environment: module.environment, memory_in_mb: match module.environment { ModuleEnvironment::Isolate => (*ISOLATE_MAX_USER_HEAP_SIZE / (1 << 20)) .try_into() .unwrap(), // This isn't correct but we don't have a value to use here. ModuleEnvironment::Node => 0, ModuleEnvironment::Invalid => 0, }, context, unix_timestamp, caller, log_lines: vec![].into(), }) }, Err(e) => Err(e), } } #[fastrace::trace] pub async fn build_deps( &self, deps: Vec<NodeDependency>, ) -> anyhow::Result<Result<ExternalDepsPackage, JsError>> { let (object_key, upload_url) = self .modules_storage .presigned_upload_url(*BUILD_DEPS_TIMEOUT) .await?; let request = BuildDepsRequest { deps: deps.clone(), upload_url, }; let build_deps_res = self.node_actions.build_deps(request).await?; Ok( build_deps_res.map(move |(digest, package_size)| ExternalDepsPackage { storage_key: object_key, sha256: digest, deps, package_size, }), ) } #[fastrace::trace] pub async fn evaluate_app_definitions( &self, app_definition: ModuleConfig, component_definitions: BTreeMap<ComponentDefinitionPath, ModuleConfig>, dependency_graph: BTreeSet<(ComponentDefinitionPath, ComponentDefinitionPath)>, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_var_overrides: BTreeMap<EnvVarName, EnvVarValue>, ) -> anyhow::Result<EvaluateAppDefinitionsResult> { let mut system_env_vars = self.default_system_env_vars.clone(); system_env_vars.extend(system_env_var_overrides); self.isolate_functions .function_runner .evaluate_app_definitions( app_definition, component_definitions, dependency_graph, user_environment_variables, system_env_vars, ) .await } #[fastrace::trace] pub async fn evaluate_component_initializer( &self, evaluated_definitions: BTreeMap<ComponentDefinitionPath, ComponentDefinitionMetadata>, path: ComponentDefinitionPath, definition: ModuleConfig, args: BTreeMap<Identifier, Resource>, name: ComponentName, ) -> anyhow::Result<BTreeMap<Identifier, Resource>> { self.isolate_functions .function_runner .evaluate_component_initializer(evaluated_definitions, path, definition, args, name) .await } #[fastrace::trace] pub async fn analyze( &self, udf_config: UdfConfig, new_modules: Vec<ModuleConfig>, source_package: SourcePackage, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_var_overrides: BTreeMap<EnvVarName, EnvVarValue>, ) -> anyhow::Result<Result<BTreeMap<CanonicalizedModulePath, AnalyzedModule>, JsError>> { let mut environment_variables = self.default_system_env_vars.clone(); environment_variables.extend(system_env_var_overrides); environment_variables.extend(user_environment_variables); let (node_modules, isolate_modules) = new_modules .into_iter() .map(|module| (module.path.clone().canonicalize(), module)) .partition(|(_, config)| config.environment == ModuleEnvironment::Node); let mut result = BTreeMap::new(); let isolate_future = self.isolate_functions.function_runner.analyze( udf_config, isolate_modules, environment_variables.clone(), ); let node_future = async { if node_modules.is_empty() { return Ok(Ok(BTreeMap::new())); } for path_str in ["schema.js", "crons.js", "http.js"] { let path = path_str .parse() .expect("Failed to parse static module names"); // The cli should not do this. Log as system error. anyhow::ensure!( !node_modules.contains_key(&path), "{path_str} can't be analyzed in Node.js!" ); } let mut source_maps = BTreeMap::new(); for (path, module) in node_modules.iter() { if let Some(source_map) = module.source_map.clone() { source_maps.insert(path.clone(), source_map); } } // Fetch source and external_deps presigned URI first let source_uri_future = self .modules_storage .signed_url(source_package.storage_key.clone(), Duration::from_secs(60)); let mut tx = self.database.begin_system().await?; let (source_uri, external_deps_package) = if let Some(external_deps_package_id) = source_package.external_deps_package_id { let pkg = ExternalPackagesModel::new(&mut tx) .get(external_deps_package_id) .await? .into_value(); let external_uri_future = self .modules_storage .signed_url(pkg.storage_key.clone(), Duration::from_secs(60)); let (source_uri, external_deps_uri) = tokio::try_join!(source_uri_future, external_uri_future)?; ( source_uri, Some(node_executor::Package { uri: external_deps_uri, key: pkg.storage_key, sha256: pkg.sha256, }), ) } else { (source_uri_future.await?, None) }; let request = AnalyzeRequest { source_package: node_executor::SourcePackage { bundled_source: node_executor::Package { uri: source_uri, key: source_package.storage_key, sha256: source_package.sha256, }, external_deps: external_deps_package, }, environment_variables, }; self.node_actions.analyze(request, &source_maps).await }; let (isolate_result, node_result) = tokio::try_join!(isolate_future, node_future)?; match isolate_result { Ok(modules) => result.extend(modules), Err(e) => return Ok(Err(e)), } match node_result { Ok(modules) => { for (path, analyzed_module) in modules { let exists = result.insert(path, analyzed_module).is_some(); // Note that although we send all modules to actions.analyze, it // currently ignores isolate modules. anyhow::ensure!(!exists, "actions.analyze returned isolate modules"); } }, Err(e) => return Ok(Err(e)), } self.validate_cron_jobs(&result)??; Ok(Ok(result)) } #[fastrace::trace] fn validate_cron_jobs( &self, modules: &BTreeMap<CanonicalizedModulePath, AnalyzedModule>, ) -> anyhow::Result<Result<(), JsError>> { // Validate that every cron job schedules an action or mutation. for module in modules.values() { let Some(crons) = module.cron_specs.as_ref() else { continue; }; for (identifier, cron_spec) in crons { let path = cron_spec.udf_path.module().clone(); let Some(scheduled_module) = modules.get(&path) else { return Ok(Err(JsError::from_message(format!( "The cron job '{identifier}' schedules a function that does not exist: {}", cron_spec.udf_path )))); }; let name = cron_spec.udf_path.function_name(); let Some(scheduled_function) = scheduled_module.functions.iter().find(|f| &f.name == name) else { return Ok(Err(JsError::from_message(format!( "The cron job '{identifier}' schedules a function that does not exist: {}", cron_spec.udf_path )))); }; match scheduled_function.udf_type { UdfType::Query => { return Ok(Err(JsError::from_message(format!( "The cron job '{identifier}' schedules a query function, only actions \ and mutations can be scheduled: {}", cron_spec.udf_path )))); }, UdfType::HttpAction => { return Ok(Err(JsError::from_message(format!( "The cron job '{identifier}' schedules an HTTP action, only actions \ and mutations can be scheduled: {}", cron_spec.udf_path )))); }, UdfType::Mutation => {}, UdfType::Action => {}, } } } Ok(Ok(())) } #[fastrace::trace] pub async fn evaluate_schema( &self, schema_bundle: ModuleSource, source_map: Option<SourceMap>, rng_seed: [u8; 32], unix_timestamp: UnixTimestamp, ) -> anyhow::Result<DatabaseSchema> { self.isolate_functions .function_runner .evaluate_schema(schema_bundle, source_map, rng_seed, unix_timestamp) .await } pub async fn evaluate_auth_config( &self, auth_config_bundle: ModuleSource, source_map: Option<SourceMap>, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_var_overrides: BTreeMap<EnvVarName, EnvVarValue>, explanation: &str, ) -> anyhow::Result<AuthConfig> { let mut environment_variables = self.default_system_env_vars.clone(); environment_variables.extend(system_env_var_overrides); environment_variables.extend(user_environment_variables); self.isolate_functions .function_runner .evaluate_auth_config( auth_config_bundle, source_map, environment_variables, explanation, ) .await } pub fn enable_actions(&self) -> anyhow::Result<()> { self.node_actions.enable() } #[fastrace::trace] pub async fn run_query_at_ts( &self, request_id: RequestId, path: PublicFunctionPath, args: Vec<JsonValue>, identity: Identity, ts: Timestamp, journal: Option<QueryJournal>, caller: FunctionCaller, ) -> anyhow::Result<QueryReturn> { let result = self .run_query_at_ts_inner(request_id, path, args, identity, ts, journal, caller) .await; match result.as_ref() { Ok(udf_outcome) => { let result = if udf_outcome.result.is_ok() { UdfExecutorResult::Success } else { UdfExecutorResult::UserError }; log_udf_executor_result(UdfType::Query, result); }, Err(e) => { log_udf_executor_result( UdfType::Query, UdfExecutorResult::SystemError(e.metric_status_label_value()), ); }, }; result } #[fastrace::trace] async fn run_query_at_ts_inner( &self, request_id: RequestId, path: PublicFunctionPath, args: Vec<JsonValue>, identity: Identity, ts: Timestamp, journal: Option<QueryJournal>, caller: FunctionCaller, ) -> anyhow::Result<QueryReturn> { if path.is_system() && !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("query")); } let args = match parse_udf_args(path.udf_path(), args) { Ok(arguments) => arguments, Err(js_error) => { return Ok(QueryReturn { result: Err(js_error), log_lines: vec![].into(), token: Token::empty(ts), journal: QueryJournal::new(), }); }, }; let usage_tracker = FunctionUsageTracker::new(); let result = self .cache_manager .get( request_id, path, args, identity, ts, journal, caller, usage_tracker.clone(), ) .await?; Ok(result) } #[fastrace::trace] async fn check_mutation_status( &self, tx: &mut Transaction<RT>, mutation_identifier: &Option<SessionRequestIdentifier>, ) -> anyhow::Result<Option<Result<MutationReturn, MutationError>>> { let Some(ref identifier) = mutation_identifier else { return Ok(None); }; let mutation_status = SessionRequestModel::new(tx) .get_session_request_record(identifier, Identity::system()) .await?; let result = match mutation_status { Some((ts, SessionRequestOutcome::Mutation { result, log_lines })) => { let age = tx.begin_timestamp().secs_since_f64(ts); tracing::info!( "Mutation already executed {age:.3}s ago so skipping {:?}", identifier ); log_mutation_already_committed(age); Ok(MutationReturn { value: result, log_lines, ts, }) }, None => return Ok(None), }; Ok(Some(result)) } #[fastrace::trace] async fn write_mutation_status( &self, tx: &mut Transaction<RT>, mutation_identifier: &Option<SessionRequestIdentifier>, outcome: &ValidatedUdfOutcome, ) -> anyhow::Result<()> { let Some(ref identifier) = mutation_identifier else { return Ok(()); }; if let Ok(ref value) = outcome.result { let record = SessionRequestRecord { session_id: identifier.session_id, request_id: identifier.request_id, outcome: SessionRequestOutcome::Mutation { result: value.clone(), log_lines: outcome.log_lines.clone(), }, identity: outcome.identity.clone(), }; SessionRequestModel::new(tx) .record_session_request(record, Identity::system()) .await?; } Ok(()) } async fn bail_if_backend_not_running(&self, tx: &mut Transaction<RT>) -> anyhow::Result<()> { let backend_state = BackendStateModel::new(tx).get_backend_state().await?; if backend_state.is_stopped() { anyhow::bail!(ErrorMetadata::bad_request( "BackendIsNotRunning", "Cannot perform this operation when the backend is not running" )); } Ok(()) } } #[async_trait] impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> { #[fastrace::trace] async fn execute_query( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, context: ExecutionContext, ) -> anyhow::Result<FunctionResult> { let ts = self.database.now_ts_for_reads(); let result = self .run_query_at_ts( context.request_id, PublicFunctionPath::Component(path), args.into_args()?, identity, *ts, None, FunctionCaller::Action { parent_scheduled_job: context.parent_scheduled_job, parent_execution_id: Some(context.execution_id), }, ) .await? .result; Ok(FunctionResult { result }) } #[fastrace::trace] async fn execute_mutation( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, context: ExecutionContext, ) -> anyhow::Result<FunctionResult> { let result = self .retry_mutation( context.request_id, PublicFunctionPath::Component(path), args.into_args()?, identity, None, FunctionCaller::Action { parent_scheduled_job: context.parent_scheduled_job, parent_execution_id: Some(context.execution_id), }, None, ) .await .map(|r| match r { Ok(mutation_return) => Ok(mutation_return.value), Err(mutation_error) => Err(mutation_error.error), })?; Ok(FunctionResult { result }) } #[fastrace::trace] async fn execute_action( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, context: ExecutionContext, ) -> anyhow::Result<FunctionResult> { let _tx = self.database.begin(identity.clone()).await?; let result = self .run_action( context.request_id, PublicFunctionPath::Component(path), args.into_args()?, identity, FunctionCaller::Action { parent_scheduled_job: context.parent_scheduled_job, parent_execution_id: Some(context.execution_id), }, ) .await .map(|r| match r { Ok(action_return) => Ok(action_return.value), Err(action_error) => Err(action_error.error), })?; Ok(FunctionResult { result }) } async fn storage_get_url( &self, identity: Identity, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<Option<String>> { let mut tx = self.database.begin(identity).await?; self.bail_if_backend_not_running(&mut tx).await?; self.file_storage .get_url(&mut tx, component, storage_id) .await } async fn storage_get_file_entry( &self, identity: Identity, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<Option<(ComponentPath, FileStorageEntry)>> { let mut tx = self.database.begin(identity).await?; self.bail_if_backend_not_running(&mut tx).await?; let Some(component_path) = tx.get_component_path(component) else { return Ok(None); }; let entry = self .file_storage .get_file_entry(&mut tx, component.into(), storage_id) .await?; Ok(entry.map(|e| (component_path, e))) } async fn storage_store_file_entry( &self, identity: Identity, component: ComponentId, entry: FileStorageEntry, ) -> anyhow::Result<(ComponentPath, DeveloperDocumentId)> { let mut tx = self.database.begin(identity.clone()).await?; self.bail_if_backend_not_running(&mut tx).await?; let (_ts, r, _stats) = self .database .execute_with_occ_retries( identity, FunctionUsageTracker::new(), "app_funrun_storage_store_file_entry", |tx| { async { let component_path = tx .get_component_path(component) .context(format!("Component {component:?} not found"))?; let id = self .file_storage .store_file_entry(tx, component.into(), entry.clone()) .await?; Ok((component_path, id)) } .into() }, ) .await?; Ok(r) } async fn storage_delete( &self, identity: Identity, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<()> { let mut tx = self.database.begin(identity.clone()).await?; self.bail_if_backend_not_running(&mut tx).await?; self.database .execute_with_occ_retries( identity, FunctionUsageTracker::new(), "app_funrun_storage_delete", |tx| { async { self.file_storage .delete(tx, component.into(), storage_id.clone()) .await?; Ok(()) } .into() }, ) .await?; Ok(()) } async fn schedule_job( &self, identity: Identity, scheduling_component: ComponentId, scheduled_path: CanonicalizedComponentFunctionPath, udf_args: SerializedArgs, scheduled_ts: UnixTimestamp, context: ExecutionContext, ) -> anyhow::Result<DeveloperDocumentId> { let (_ts, virtual_id, _stats) = self .database .execute_with_occ_retries( identity, FunctionUsageTracker::new(), "app_funrun_schedule_job", |tx| { let path = scheduled_path.clone(); let args = udf_args.clone(); let context = context.clone(); async move { let (path, udf_args) = validate_schedule_args( path, args.into_args()?, scheduled_ts, // Scheduling from actions is not transaction and happens at latest // timestamp. self.database.runtime().unix_timestamp(), tx, ) .await?; let virtual_id = VirtualSchedulerModel::new(tx, scheduling_component.into()) .schedule(path, udf_args, scheduled_ts, context) .await?; Ok(virtual_id) } .into() }, ) .await?; Ok(virtual_id) } async fn cancel_job( &self, identity: Identity, virtual_id: DeveloperDocumentId, ) -> anyhow::Result<()> { self.database .execute_with_occ_retries( identity, FunctionUsageTracker::new(), "app_funrun_cancel_job", |tx| { async { VirtualSchedulerModel::new(tx, TableNamespace::by_component_TODO()) .cancel(virtual_id) .await } .into() }, ) .await?; Ok(()) } async fn vector_search( &self, identity: Identity, query: JsonValue, ) -> anyhow::Result<(Vec<PublicVectorSearchQueryResult>, FunctionUsageStats)> { let query = VectorSearch::try_from(query).map_err(|e| { let message = e.to_string(); e.context(ErrorMetadata::bad_request("InvalidVectorQuery", message)) })?; self.database.vector_search(identity, query).await } async fn lookup_function_handle( &self, identity: Identity, handle: FunctionHandle, ) -> anyhow::Result<CanonicalizedComponentFunctionPath> { let mut tx = self.database.begin(identity).await?; FunctionHandlesModel::new(&mut tx).lookup(handle).await } async fn create_function_handle( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, ) -> anyhow::Result<FunctionHandle> { let mut tx = self.database.begin(identity).await?; FunctionHandlesModel::new(&mut tx) .get_with_component_path(path) .await } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server