Skip to main content
Glama

Convex MCP server

Official
by get-convex
mod.rs36.5 kB
use common::{ components::{ CanonicalizedComponentFunctionPath, ResolvedComponentFunctionPath, }, execution_context::ExecutionContext, }; use futures::{ future::BoxFuture, select_biased, FutureExt, }; use model::{ environment_variables::types::{ EnvVarName, EnvVarValue, }, modules::{ module_versions::FullModuleSource, user_error::FunctionNotFoundError, }, }; use tokio::sync::oneshot; use udf::{ helpers::serialize_udf_args, FunctionOutcome, SyscallTrace, }; pub mod async_syscall; mod phase; pub mod syscall; use std::{ cmp::Ordering, collections::VecDeque, sync::Arc, }; use anyhow::anyhow; use common::{ errors::JsError, identity::InertIdentity, knobs::{ DATABASE_UDF_SYSTEM_TIMEOUT, DATABASE_UDF_USER_TIMEOUT, FUNCTION_MAX_ARGS_SIZE, FUNCTION_MAX_RESULT_SIZE, TRANSACTION_MAX_NUM_SCHEDULED, TRANSACTION_MAX_NUM_USER_WRITES, TRANSACTION_MAX_READ_SET_INTERVALS, TRANSACTION_MAX_READ_SIZE_BYTES, TRANSACTION_MAX_READ_SIZE_ROWS, TRANSACTION_MAX_SCHEDULED_TOTAL_ARGUMENT_SIZE_BYTES, TRANSACTION_MAX_USER_WRITE_SIZE_BYTES, }, log_lines::{ LogLevel, LogLine, LogLines, }, query_journal::QueryJournal, runtime::{ Runtime, UnixTimestamp, }, types::{ PersistenceVersion, UdfType, }, value::{ ConvexArray, ConvexValue, }, }; use database::{ BiggestDocumentWrites, FunctionExecutionSize, Transaction, OVER_LIMIT_HELP, }; use deno_core::{ serde_v8, v8, }; use errors::ErrorMetadata; use file_storage::TransactionalFileStorage; use keybroker::KeyBroker; use rand::Rng; use rand_chacha::ChaCha12Rng; use serde_json::Value as JsonValue; use udf::UdfOutcome; use value::{ heap_size::{ HeapSize, WithHeapSize, }, JsonPackedValue, NamespacedTableMapping, Size, MAX_DOCUMENT_NESTING, MAX_USER_SIZE, VALUE_TOO_LARGE_SHORT_MSG, }; use self::{ async_syscall::{ AsyncSyscallBatch, PendingSyscall, QueryManager, }, phase::UdfPhase, syscall::syscall_impl, }; use super::{ helpers::permit::with_release_permit, warnings::{ approaching_duration_limit_warning, approaching_limit_warning, SystemWarning, }, ModuleCodeCacheResult, }; use crate::{ client::{ EnvironmentData, SharedIsolateHeapStats, UdfCallback, UdfRequest, }, concurrency_limiter::ConcurrencyPermit, environment::{ helpers::{ module_loader::module_specifier_from_path, resolve_promise, MAX_LOG_LINES, }, udf::async_syscall::DatabaseSyscallsV1, AsyncOpRequest, IsolateEnvironment, }, helpers::{ self, deserialize_udf_result, pump_message_loop, }, isolate::{ Isolate, IsolateHeapStats, }, metrics::{ self, log_isolate_request_cancelled, }, request_scope::RequestScope, strings, termination::TerminationReason, timeout::{ FunctionExecutionTime, Timeout, }, }; pub struct DatabaseUdfEnvironment<RT: Runtime> { rt: RT, udf_type: UdfType, path: ResolvedComponentFunctionPath, arguments: ConvexArray, identity: InertIdentity, udf_server_version: Option<semver::Version>, client_id: String, phase: UdfPhase<RT>, file_storage: TransactionalFileStorage<RT>, query_manager: QueryManager<RT>, persistence_version: PersistenceVersion, key_broker: KeyBroker, log_lines: LogLines, /// Journal from a previous computation of this UDF used as an input to this /// UDF. If this is the first run, the journal will be blank. prev_journal: QueryJournal, /// Journal to write decisions made during this UDF computation. next_journal: QueryJournal, pending_syscalls: WithHeapSize<VecDeque<PendingSyscall>>, syscall_trace: SyscallTrace, heap_stats: SharedIsolateHeapStats, context: ExecutionContext, reactor_depth: usize, udf_callback: Box<dyn UdfCallback<RT>>, } fn not_allowed_in_udf(name: &str, description: &str) -> ErrorMetadata { ErrorMetadata::bad_request( format!("No{name}InQueriesOrMutations"), format!( "Can't use {description} in queries and mutations. Please consider using an action. \ See https://docs.convex.dev/functions/actions for more details.", ), ) } impl<RT: Runtime> IsolateEnvironment<RT> for DatabaseUdfEnvironment<RT> { fn trace(&mut self, level: LogLevel, messages: Vec<String>) -> anyhow::Result<()> { self.emit_log_line(LogLine::new_developer_log_line( level, messages, // Note: accessing the current time here is still deterministic since // we don't externalize the time to the function. self.rt.unix_timestamp(), )); Ok(()) } fn rng(&mut self) -> anyhow::Result<&mut ChaCha12Rng> { self.phase.rng() } fn crypto_rng(&mut self) -> anyhow::Result<super::crypto_rng::CryptoRng> { anyhow::bail!(not_allowed_in_udf("CryptoRng", "cryptographic randomness")) } fn unix_timestamp(&mut self) -> anyhow::Result<UnixTimestamp> { self.phase.unix_timestamp() } fn get_environment_variable( &mut self, name: EnvVarName, ) -> anyhow::Result<Option<EnvVarValue>> { self.phase.get_environment_variable(name) } fn get_all_table_mappings(&mut self) -> anyhow::Result<NamespacedTableMapping> { let namespace = self.phase.component()?.into(); let tx = self.phase.tx()?; Ok(tx.table_mapping().namespace(namespace)) } async fn lookup_source( &mut self, path: &str, timeout: &mut Timeout<RT>, permit: &mut Option<ConcurrencyPermit>, ) -> anyhow::Result<Option<(Arc<FullModuleSource>, ModuleCodeCacheResult)>> { let user_module_path = path.parse()?; let result = self .phase .get_module(&user_module_path, timeout, permit) .await?; Ok(result) } fn syscall(&mut self, name: &str, args: JsonValue) -> anyhow::Result<JsonValue> { syscall_impl(self, name, args) } fn start_async_syscall( &mut self, name: String, args: JsonValue, resolver: v8::Global<v8::PromiseResolver>, ) -> anyhow::Result<()> { self.pending_syscalls.push_back(PendingSyscall { name, args, resolver, }); Ok(()) } fn start_async_op( &mut self, request: AsyncOpRequest, _resolver: v8::Global<v8::PromiseResolver>, ) -> anyhow::Result<()> { anyhow::bail!(not_allowed_in_udf( request.name_for_error(), &request.description_for_error(), )) } fn record_heap_stats(&self, mut isolate_stats: IsolateHeapStats) { // Add the memory allocated by the environment itself. isolate_stats.environment_heap_size = self.pending_syscalls.heap_size() + self.syscall_trace.heap_size(); self.heap_stats.store(isolate_stats); } fn user_timeout(&self) -> std::time::Duration { *DATABASE_UDF_USER_TIMEOUT } fn system_timeout(&self) -> std::time::Duration { *DATABASE_UDF_SYSTEM_TIMEOUT } fn is_nested_function(&self) -> bool { self.reactor_depth > 0 } } impl<RT: Runtime> DatabaseUdfEnvironment<RT> { pub fn new( rt: RT, EnvironmentData { key_broker, default_system_env_vars, file_storage, module_loader, }: EnvironmentData<RT>, heap_stats: SharedIsolateHeapStats, UdfRequest { path_and_args, udf_type, transaction, journal, context, }: UdfRequest<RT>, reactor_depth: usize, udf_callback: Box<dyn UdfCallback<RT>>, client_id: String, ) -> Self { let persistence_version = transaction.persistence_version(); let (path, arguments, udf_server_version) = path_and_args.consume(); let component = path.component; Self { rt: rt.clone(), udf_type, path, arguments, identity: transaction.inert_identity(), udf_server_version, phase: UdfPhase::new( transaction, rt, module_loader.clone(), default_system_env_vars, component, ), file_storage, query_manager: QueryManager::new(), persistence_version, key_broker, log_lines: vec![].into(), prev_journal: journal, next_journal: QueryJournal::new(), pending_syscalls: WithHeapSize::default(), syscall_trace: SyscallTrace::new(), heap_stats, context, reactor_depth, udf_callback, client_id, } } #[fastrace::trace] pub async fn run( mut self, client_id: String, isolate: &mut Isolate<RT>, v8_context: v8::Global<v8::Context>, isolate_clean: &mut bool, cancellation: BoxFuture<'_, ()>, function_started: Option<oneshot::Sender<()>>, ) -> anyhow::Result<(Transaction<RT>, FunctionOutcome)> { // Initialize the UDF's RNG from some high-quality entropy. As with // `unix_timestamp` below, the UDF is only deterministic modulo this // system-generated input. let rng_seed = self.rt.rng().random(); let unix_timestamp = self.rt.unix_timestamp(); let heap_stats = self.heap_stats.clone(); // See Isolate::with_context for an explanation of this setup code. We can't use // that method directly since we want an `await` below, and passing in a // generic async closure to `Isolate` is currently difficult. let client_id = Arc::new(client_id); let path = self.path.clone(); let (handle, state) = isolate.start_request(client_id, self).await?; if let Some(tx) = function_started { // At this point we have acquired a permit and aren't going to // reject the function for capacity reasons. _ = tx.send(()); } let mut handle_scope = isolate.handle_scope(); let v8_context = v8::Local::new(&mut handle_scope, v8_context); let mut context_scope = v8::ContextScope::new(&mut handle_scope, v8_context); let mut isolate_context = RequestScope::new(&mut context_scope, handle.clone(), state, false).await?; let mut result = Self::run_inner(&mut isolate_context, cancellation, rng_seed, unix_timestamp).await; // Perform a microtask checkpoint one last time before taking the environment // to ensure the microtask queue is empty. Otherwise, JS from this request may // leak to a subsequent one on isolate reuse. isolate_context.checkpoint(); *isolate_clean = true; // Override the returned result if we hit a termination error. let termination_error = handle .take_termination_error(Some(heap_stats.get()), &format!("{:?}", path.for_logging())); match termination_error { Ok(Ok(..)) => (), Ok(Err(e)) => { result = Ok(Err(e)); }, Err(e) => { result = Err(e); }, } // Our environment may be in an inconsistent state after a system error (e.g. // the transaction may be missing if we hit a system error during a // cross-component call), so be sure to error out here before using the // environment. let result = result?; let execution_time; (self, execution_time) = isolate_context.take_environment(); let success_result_value = result.as_ref().ok(); Self::add_warnings_to_log_lines( &self.path.clone().for_logging(), &self.arguments, execution_time, self.phase.execution_size()?, self.phase.biggest_document_writes()?, success_result_value, |warning| { self.log_lines.push(LogLine::new_system_log_line( warning.level, warning.messages, // Note: accessing the current time here is still deterministic since // we don't externalize the time to the function. self.rt.unix_timestamp(), warning.system_log_metadata, )); }, )?; let outcome = match self.udf_type { UdfType::Query => FunctionOutcome::Query(UdfOutcome { path: self.path.for_logging(), arguments: self.arguments, identity: self.identity, observed_identity: self.phase.observed_identity(), rng_seed, observed_rng: self.phase.observed_rng(), unix_timestamp, observed_time: self.phase.observed_time(), log_lines: self.log_lines, journal: self.next_journal, result: match result { Ok(v) => Ok(JsonPackedValue::pack(v)), Err(e) => Err(e), }, syscall_trace: self.syscall_trace, udf_server_version: self.udf_server_version, }), // TODO: Add num_writes and write_bandwidth to UdfOutcome, // and use them in log_mutation. UdfType::Mutation => FunctionOutcome::Mutation(UdfOutcome { path: self.path.for_logging(), arguments: self.arguments, identity: self.identity, observed_identity: self.phase.observed_identity(), rng_seed, observed_rng: self.phase.observed_rng(), unix_timestamp, observed_time: self.phase.observed_time(), log_lines: self.log_lines, journal: self.next_journal, result: match result { Ok(v) => Ok(JsonPackedValue::pack(v)), Err(e) => Err(e), }, syscall_trace: self.syscall_trace, udf_server_version: self.udf_server_version, }), _ => anyhow::bail!("UdfEnvironment should only run queries and mutations"), }; Ok((self.phase.into_transaction()?, outcome)) } #[convex_macro::instrument_future] #[fastrace::trace] async fn run_inner( isolate: &mut RequestScope<'_, '_, RT, Self>, cancellation: BoxFuture<'_, ()>, rng_seed: [u8; 32], unix_timestamp: UnixTimestamp, ) -> anyhow::Result<Result<ConvexValue, JsError>> { let handle = isolate.handle(); let mut v8_scope = isolate.scope(); let mut scope = RequestScope::<RT, Self>::enter(&mut v8_scope); // Initialize the environment, preloading the UDF config, before executing any // JS. { let state = scope.state_mut()?; state .environment .phase .initialize(&mut state.timeout, &mut state.permit) .await?; } let (udf_type, path, udf_args) = { let state = scope.state()?; let environment = &state.environment; ( environment.udf_type, environment.path.clone(), environment.arguments.clone(), ) }; let udf_path = path.udf_path.clone(); // Don't allow directly running a UDF within the `_deps` directory. We don't // really expect users to hit this unless someone is trying to exploit // an app written on Convex by calling directly into a compromised // dependency. So, consider it a system error so we can just // keep a watch on it. if udf_path.module().is_deps() { anyhow::bail!("Refusing to run {udf_path:?} within the '_deps' directory"); } // First, load the user's module and find the specified function. let module_path = udf_path.module().clone(); let Ok(module_specifier) = module_specifier_from_path(&module_path) else { let message = format!("Invalid module path: {module_path:?}"); return Ok(Err(JsError::from_message(message))); }; let module = match scope .eval_user_module(udf_type, false, &module_specifier) .await? { Ok(id) => id, Err(e) => return Ok(Err(e)), }; let namespace = module .get_module_namespace() .to_object(&mut scope) .ok_or_else(|| anyhow!("Module namespace wasn't an object?"))?; let function_name = udf_path.function_name(); let function_str: v8::Local<'_, v8::Value> = v8::String::new(&mut scope, function_name) .ok_or_else(|| anyhow!("Failed to create function name string"))? .into(); if namespace.has(&mut scope, function_str) != Some(true) { let message = format!( "{}", FunctionNotFoundError::new(udf_path.function_name(), udf_path.module().as_str()) ); return Ok(Err(JsError::from_message(message))); } let function: v8::Local<v8::Object> = namespace .get(&mut scope, function_str) .ok_or_else(|| anyhow!("Did not find function in module after checking?"))? .try_into()?; // Mutations and queries are wrapped in JavaScript by a function that adds a // property marking it as a query or mutation UDF. let is_mutation_str = strings::isMutation.create(&mut scope)?.into(); let mut is_mutation = false; if let Some(true) = function.has(&mut scope, is_mutation_str) { is_mutation = function .get(&mut scope, is_mutation_str) .ok_or_else(|| anyhow!("Missing `is_mutation` after explicit check"))? .is_true(); } let is_query_str = strings::isQuery.create(&mut scope)?.into(); let mut is_query = false; if let Some(true) = function.has(&mut scope, is_query_str) { is_query = function .get(&mut scope, is_query_str) .ok_or_else(|| anyhow!("Missing `is_query` after explicit check"))? .is_true(); } let invoke_query_str = strings::invokeQuery.create(&mut scope)?.into(); let invoke_mutation_str = strings::invokeMutation.create(&mut scope)?.into(); let invoke_str = match (udf_type, is_query, is_mutation) { (UdfType::Query, true, false) => invoke_query_str, (UdfType::Mutation, false, true) => invoke_mutation_str, (_, false, false) => { let message = format!( "Function {udf_path:?} is neither a query or mutation. Did you forget to wrap \ it with `query` or `mutation`?" ); return Ok(Err(JsError::from_message(message))); }, (UdfType::Query, false, true) => { let message = format!( "Function {udf_path:?} is registered as a mutation but is being run as a \ query." ); return Ok(Err(JsError::from_message(message))); }, (UdfType::Mutation, true, false) => { let message = format!( "Function {udf_path:?} is registered as a query but is being run as a \ mutation." ); return Ok(Err(JsError::from_message(message))); }, _ => { anyhow::bail!( "Unexpected function classification: {udf_type} vs. (is_query: {is_query}, \ is_mutation: {is_mutation})" ); }, }; let args_str = serialize_udf_args(udf_args)?; metrics::log_argument_length(&args_str); let args_v8_str = v8::String::new(&mut scope, &args_str) .ok_or_else(|| anyhow!("Failed to create argument string"))?; let invoke: v8::Local<v8::Function> = function .get(&mut scope, invoke_str) .ok_or_else(|| anyhow!("Couldn't find invoke function in {udf_path:?}"))? .try_into()?; // Switch our phase to executing right before calling into the UDF. { let state = scope.state_mut()?; state .environment .phase .begin_execution(rng_seed, unix_timestamp)?; } let global = scope.get_current_context().global(&mut scope); let promise_r = scope.with_try_catch(|s| invoke.call(s, global.into(), &[args_v8_str.into()])); // If we hit a system error within a syscall, return `Err`, even if JS thinks it // returned successfully. The syscall layer uses // `scope.terminate_execution()` when we hit a system error, which // unfortunately doesn't actually terminate execution immediately. So, it's // possible for JS after the failed syscall to keep running and return a result // here before checking the termination flag. handle.check_terminated()?; let promise: v8::Local<v8::Promise> = match promise_r? { Ok(Some(v)) => v.try_into()?, Ok(None) => anyhow::bail!("Successful invocation returned None"), Err(e) => return Ok(Err(e)), }; let mut cancellation = cancellation.fuse(); loop { // Advance the user's promise as far as it can go by draining the microtask // queue. scope.perform_microtask_checkpoint(); pump_message_loop(&mut scope); scope.record_heap_stats()?; handle.check_terminated()?; // Check for rejected promises still unhandled, if so terminate. let rejections = scope.pending_unhandled_promise_rejections_mut(); if let Some(promise) = rejections.exceptions.keys().next().cloned() { let error = rejections.exceptions.remove(&promise).unwrap(); let as_local = v8::Local::new(&mut scope, error); let err = match scope.format_traceback(as_local) { Ok(e) => e, Err(e) => { handle.terminate_and_throw(TerminationReason::SystemError(Some(e)))?; }, }; handle.terminate_and_throw(TerminationReason::UnhandledPromiseRejection(err))?; } if let v8::PromiseState::Rejected = promise.state() { // Stop execution immediately once we hit an error. break; } // If the user's promise is blocked, it must have a pending syscall. // Execute a batch of syscalls before reentering into JS. // These are executed in a batch deterministically, down to which fetches // hit the cache. AsyncSyscallBatch decides which syscalls can run in // a batch together. // Results are externalized to user space in FIFO order. let (resolvers, results) = { let state = scope.state_mut()?; let Some(p) = state.environment.pending_syscalls.pop_front() else { // No syscalls or javascript to run, so we're done. break; }; let mut batch = AsyncSyscallBatch::new(p.name, p.args); let mut resolvers = vec![p.resolver]; while let Some(p) = state.environment.pending_syscalls.front() && batch.can_push(&p.name, &p.args) { let p = state .environment .pending_syscalls .pop_front() .expect("should have a syscall"); batch.push(p.name, p.args)?; resolvers.push(p.resolver); } // Pause the user-code UDF timeout for the duration of the syscall. // This works because we know that the user is blocked on some syscall, // so running the syscall is on us and we shouldn't count this time // towards the user timeout. When we allow more concurrency, we // may have to rework this. // NOTE: Even though we release the permit, the syscall does in v8. // It is better if we run it in tokio to avoid oversubscribing the CPU. // TODO: Consider running the async call from a tokio thread. // Even though the future would be blocking on the database most of the // time it still does some processing that might result in oversubscribing // the CPU threads dedicated to v8. let results = select_biased! { _ = cancellation => { log_isolate_request_cancelled(); anyhow::bail!("Cancelled"); }, results = with_release_permit( &mut state.timeout, &mut state.permit, DatabaseSyscallsV1::run_async_syscall_batch( &mut state.environment, batch, ).map(Ok), ).fuse() => results?, }; (resolvers, results) }; // Every syscall must have a result (which could be an error or None). assert_eq!(resolvers.len(), results.len()); // Complete the syscall's promise, which will put its handlers on the microtask // queue. for (resolver, result) in resolvers.into_iter().zip(results.into_iter()) { let mut result_scope = v8::HandleScope::new(&mut *scope); let result_v8 = match result { Ok(v) => Ok(serde_v8::to_v8(&mut result_scope, v)?), Err(e) => Err(e), }; resolve_promise(&mut result_scope, resolver, result_v8)?; } handle.check_terminated()?; } // Check to see if the user's promise is blocked. let result = match promise.state() { v8::PromiseState::Pending => Err(JsError::from_message( "Returned promise will never resolve".to_string(), )), v8::PromiseState::Fulfilled => { anyhow::ensure!( scope.state()?.environment.pending_syscalls.is_empty(), "queries and mutations should run all syscalls to completion" ); let promise_result_v8 = promise.result(&mut scope); let result_v8_str: v8::Local<v8::String> = promise_result_v8.try_into()?; let result_str = helpers::to_rust_string(&mut scope, &result_v8_str)?; metrics::log_result_length(&result_str); deserialize_udf_result(&path, &result_str)? }, v8::PromiseState::Rejected => { let e = promise.result(&mut scope); Err(scope.format_traceback(e)?) }, }; Ok(result) } pub fn emit_log_line(&mut self, log_line: LogLine) { // - 1 to reserve for the [ERROR] log line match self.log_lines.len().cmp(&(MAX_LOG_LINES - 1)) { Ordering::Less => self.log_lines.push(log_line), Ordering::Equal => { drop(log_line); let log_line = LogLine::new_developer_log_line( LogLevel::Error, vec![format!( "Log overflow (maximum {MAX_LOG_LINES}). Remaining log lines omitted." )], // Note: accessing the current time here is still deterministic since // we don't externalize the time to the function. self.rt.unix_timestamp(), ); self.log_lines.push(log_line); }, Ordering::Greater => (), } } pub fn emit_sub_function_log_lines( &mut self, path: CanonicalizedComponentFunctionPath, log_lines: LogLines, ) { // -1 to reserve for the [ERROR] log line if self.log_lines.len() > MAX_LOG_LINES - 1 { // We have previously exceeded the logging limit, so skip these logs. return; } if self.log_lines.len() + log_lines.len() > MAX_LOG_LINES - 1 { // We are about to exceed the logging limit, so truncate the logs. let allowed_length = MAX_LOG_LINES - 1 - self.log_lines.len(); self.log_lines.push(LogLine::SubFunction { path, log_lines: log_lines.truncated(allowed_length), }); let log_line = LogLine::new_developer_log_line( LogLevel::Error, vec![format!( "Log overflow (maximum {MAX_LOG_LINES}). Remaining log lines omitted." )], // Note: accessing the current time here is still deterministic since // we don't externalize the time to the function. self.rt.unix_timestamp(), ); self.log_lines.push(log_line); } else { self.log_lines .push(LogLine::SubFunction { path, log_lines }); } } // Called when a function finishes pub fn add_warnings_to_log_lines( path: &CanonicalizedComponentFunctionPath, arguments: &ConvexArray, execution_time: FunctionExecutionTime, execution_size: FunctionExecutionSize, biggest_writes: Option<BiggestDocumentWrites>, result: Option<&ConvexValue>, mut trace_system_warning: impl FnMut(SystemWarning), ) -> anyhow::Result<()> { // let execution_size = self.phase.execution_size(); // let biggest_writes = self.phase.biggest_document_writes(); let udf_path = path.udf_path.clone(); let system_udf_path = if udf_path.is_system() { Some(udf_path) } else { None }; if let Some(warning) = approaching_limit_warning( arguments.size(), *FUNCTION_MAX_ARGS_SIZE, "TooLargeFunctionArguments", || "Large size of the function arguments".to_string(), None, Some(" bytes"), system_udf_path.as_ref(), )? { trace_system_warning(warning); } if let Some(warning) = approaching_limit_warning( execution_size.read_size.total_document_count, *TRANSACTION_MAX_READ_SIZE_ROWS, "TooManyDocumentsRead", || "Many documents read in a single function execution".to_string(), Some(OVER_LIMIT_HELP), None, system_udf_path.as_ref(), )? { trace_system_warning(warning); } if let Some(warning) = approaching_limit_warning( execution_size.num_intervals, *TRANSACTION_MAX_READ_SET_INTERVALS, "TooManyReads", || "Many reads in a single function execution".to_string(), Some(OVER_LIMIT_HELP), None, system_udf_path.as_ref(), )? { trace_system_warning(warning); } if let Some(warning) = approaching_limit_warning( execution_size.read_size.total_document_size, *TRANSACTION_MAX_READ_SIZE_BYTES, "TooManyBytesRead", || "Many bytes read in a single function execution".to_string(), Some(OVER_LIMIT_HELP), Some(" bytes"), system_udf_path.as_ref(), )? { trace_system_warning(warning); } if let Some(warning) = approaching_limit_warning( execution_size.write_size.num_writes, *TRANSACTION_MAX_NUM_USER_WRITES, "TooManyWrites", || "Many writes in a single function execution".to_string(), None, None, system_udf_path.as_ref(), )? { trace_system_warning(warning); } if let Some(warning) = approaching_limit_warning( execution_size.write_size.size, *TRANSACTION_MAX_USER_WRITE_SIZE_BYTES, "TooManyBytesWritten", || "Many bytes written in a single function execution".to_string(), None, Some(" bytes"), system_udf_path.as_ref(), )? { trace_system_warning(warning); } if let Some(warning) = approaching_limit_warning( execution_size.scheduled_size.num_writes, *TRANSACTION_MAX_NUM_SCHEDULED, "TooManyFunctionsScheduled", || "Many functions scheduled by this mutation".to_string(), None, None, system_udf_path.as_ref(), )? { trace_system_warning(warning); } if let Some(warning) = approaching_limit_warning( execution_size.scheduled_size.size, *TRANSACTION_MAX_SCHEDULED_TOTAL_ARGUMENT_SIZE_BYTES, "ScheduledFunctionsArgumentsTooLarge", || { "Large total size of the arguments of scheduled functions from this mutation" .to_string() }, None, Some(" bytes"), system_udf_path.as_ref(), )? { trace_system_warning(warning); } if let Some(biggest_writes) = biggest_writes { let (max_size_document_id, max_size) = biggest_writes.max_size; if let Some(warning) = approaching_limit_warning( max_size, MAX_USER_SIZE, VALUE_TOO_LARGE_SHORT_MSG, || format!("Large document written with ID \"{max_size_document_id}\""), None, Some(" bytes"), system_udf_path.as_ref(), )? { trace_system_warning(warning); } let (max_nesting_document_id, max_nesting) = biggest_writes.max_nesting; if let Some(warning) = approaching_limit_warning( max_nesting, MAX_DOCUMENT_NESTING, "TooNested", || format!("Deeply nested document written with ID \"{max_nesting_document_id}\""), None, Some(" levels"), system_udf_path.as_ref(), )? { trace_system_warning(warning); } } if let Some(result) = result { if let Some(warning) = approaching_limit_warning( result.size(), *FUNCTION_MAX_RESULT_SIZE, "TooLargeFunctionResult", || "Large size of the function return value".to_string(), None, Some(" bytes"), system_udf_path.as_ref(), )? { trace_system_warning(warning); } }; if let Some(warning) = approaching_duration_limit_warning( execution_time.elapsed, execution_time.limit, "UserTimeout", "Function execution took a long time", system_udf_path.as_ref(), )? { trace_system_warning(warning); } Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server