Skip to main content
Glama

Convex MCP server

Official
by get-convex
lib.rs119 kB
#![feature(try_blocks)] #![feature(iterator_try_collect)] #![feature(let_chains)] #![feature(coroutines)] #![feature(round_char_boundary)] #![feature(duration_constructors)] #![feature(duration_constructors_lite)] #![feature(assert_matches)] use std::{ collections::{ BTreeMap, BTreeSet, HashSet, }, ops::Bound, sync::Arc, time::{ Duration, SystemTime, UNIX_EPOCH, }, }; use ::exports::interface::ExportProvider; use ::log_streaming::{ LogManager, LogManagerClient, }; use airbyte_import::{ AirbyteRecord, PrimaryKey, ValidatedAirbyteStream, }; use anyhow::Context; use authentication::{ application_auth::ApplicationAuth, validate_id_token, AuthIdToken, }; use aws_s3::storage::S3Storage; use bytes::Bytes; use chrono::{ DateTime, Utc, }; use common::{ auth::{ AuthConfig, AuthInfo, }, bootstrap_model::{ components::handles::FunctionHandle, index::{ database_index::IndexedFields, index_validation_error, IndexMetadata, }, schema::{ invalid_schema_id, parse_schema_id, SchemaState, }, }, components::{ CanonicalizedComponentFunctionPath, CanonicalizedComponentModulePath, ComponentDefinitionPath, ComponentId, ComponentPath, PublicFunctionPath, Reference, Resource, }, document::{ DocumentUpdate, CREATION_TIME_FIELD_PATH, }, errors::{ report_error, JsError, }, http::{ fetch::FetchClient, RequestDestination, }, knobs::{ APPLICATION_MAX_CONCURRENT_UPLOADS, ENABLE_INDEX_BACKFILL, MAX_JOBS_CANCEL_BATCH, MAX_USER_MODULES, }, log_lines::LogLines, log_streaming::LogSender, paths::FieldPath, persistence::Persistence, query::{ IndexRange, IndexRangeExpression, Order, }, query_journal::QueryJournal, runtime::{ shutdown_and_join, JoinSet, Runtime, SpawnHandle, UnixTimestamp, }, schemas::{ DatabaseSchema, TableDefinition, }, shutdown::ShutdownSignal, types::{ env_var_limit_met, env_var_name_not_unique, AllowedVisibility, ConvexOrigin, ConvexSite, CursorMs, EnvVarName, EnvVarValue, FullyQualifiedObjectKey, FunctionCaller, IndexId, IndexName, ModuleEnvironment, NodeDependency, ObjectKey, RepeatableTimestamp, TableName, Timestamp, UdfIdentifier, UdfType, ENV_VAR_LIMIT, }, RequestId, }; use convex_fivetran_destination::{ api_types::{ BatchWriteRow, DeleteType, }, constants::FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR, }; use cron_jobs::CronJobExecutor; use database::{ unauthorized_error, BootstrapComponentsModel, Database, FastForwardIndexWorker, IndexModel, IndexWorker, OccRetryStats, ResolvedQuery, SchemaModel, SearchIndexWorkers, Snapshot, Subscription, TableModel, Token, Transaction, UserFacingModel, WriteSource, }; use either::Either; use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; use fastrace::{ collector::SpanContext, func_path, future::FutureExt, Span, }; use file_storage::{ FileRangeStream, FileStorage, FileStream, }; use function_log::{ FunctionExecution, FunctionExecutionPart, }; use function_runner::FunctionRunner; use futures::stream::BoxStream; use headers::{ ContentLength, ContentType, }; use http_client::{ cached_http_client_for, ClientPurpose, }; use isolate::helpers::source_map_from_slice; use keybroker::{ Identity, KeyBroker, }; use log_streaming::add_local_log_sink_on_startup; use maplit::{ btreemap, btreeset, }; use model::{ airbyte_import::{ AirbyteImportModel, AIRBYTE_PRIMARY_KEY_INDEX_DESCRIPTOR, }, auth::AuthInfoModel, backend_info::BackendInfoModel, backend_state::BackendStateModel, canonical_urls::{ types::CanonicalUrl, CanonicalUrlsModel, }, components::{ config::ComponentConfigModel, handles::FunctionHandlesModel, types::ProjectConfig, ComponentsModel, }, config::{ module_loader::ModuleLoader, types::{ ConfigFile, ConfigMetadata, ModuleConfig, AUTH_CONFIG_FILE_NAME, }, ConfigModel, }, database_globals::{ types::StorageTagInitializer, DatabaseGlobalsModel, }, deployment_audit_log::{ types::DeploymentAuditLogEvent, DeploymentAuditLogModel, }, environment_variables::{ types::EnvironmentVariable, EnvironmentVariablesModel, }, exports::{ types::{ Export, ExportFormat, ExportRequestor, }, ExportsModel, }, external_packages::{ types::{ ExternalDepsPackage, ExternalDepsPackageId, }, ExternalPackagesModel, }, file_storage::{ types::FileStorageEntry, FileStorageId, }, fivetran_import::FivetranImportModel, migrations::MigrationWorker, modules::{ module_versions::{ AnalyzedModule, Visibility, }, ModuleModel, }, scheduled_jobs::SchedulerModel, session_requests::types::SessionRequestIdentifier, snapshot_imports::types::{ ImportFormat, ImportMode, ImportRequestor, }, source_packages::{ types::{ NodeVersion, PackageSize, SourcePackage, }, upload_download::upload_package, SourcePackageModel, }, udf_config::{ types::UdfConfig, UdfConfigModel, }, }; use node_executor::Actions; use parking_lot::Mutex; use rand::Rng; use scheduled_jobs::ScheduledJobRunner; use schema_worker::SchemaWorker; use search::{ query::RevisionWithKeys, searcher::{ Searcher, SegmentTermMetadataFetcher, }, }; use semver::Version; use serde_json::Value as JsonValue; use short_future::ShortBoxFuture; use snapshot_import::{ clear_tables, start_stored_import, }; use storage::{ BufferedUpload, ClientDrivenUploadPartToken, ClientDrivenUploadToken, LocalDirStorage, Storage, StorageExt, StorageGetStream, StorageUseCase, Upload, }; use sync_types::{ AuthenticationToken, CanonicalizedModulePath, CanonicalizedUdfPath, FunctionName, ModulePath, SerializedQueryJournal, }; use system_table_cleanup::SystemTableCleanupWorker; use table_summary_worker::{ TableSummaryClient, TableSummaryWorker, }; use tokio::sync::{ oneshot, Semaphore, }; use udf::{ environment::{ system_env_var_overrides, CONVEX_ORIGIN, CONVEX_SITE, }, helpers::parse_udf_args, HttpActionRequest, HttpActionResponseStreamer, HttpActionResult, }; use udf_metrics::{ MetricsWindow, Percentile, Timeseries, }; use usage_tracking::{ FunctionUsageStats, FunctionUsageTracker, UsageCounter, }; use value::{ id_v6::DeveloperDocumentId, sha256::Sha256Digest, JsonPackedValue, Namespace, ResolvedDocumentId, TableNamespace, }; use vector::{ PublicVectorSearchQueryResult, VectorSearch, }; use crate::{ application_function_runner::ApplicationFunctionRunner, exports::worker::ExportWorker, function_log::{ FunctionExecutionLog, TableRate, UdfMetricSummary, UdfRate, }, log_visibility::LogVisibility, module_cache::ModuleCache, redaction::{ RedactedJsError, RedactedLogLines, }, snapshot_import::SnapshotImportWorker, }; pub mod airbyte_import; pub mod api; pub mod application_function_runner; mod cache; pub mod cron_jobs; pub mod deploy_config; mod exports; pub mod function_log; mod log_streaming; pub mod log_visibility; mod metrics; mod module_cache; pub mod redaction; pub mod scheduled_jobs; mod schema_worker; pub mod snapshot_import; mod streaming_export; mod system_table_cleanup; mod table_summary_worker; pub mod valid_identifier; #[cfg(any(test, feature = "testing"))] pub mod test_helpers; #[cfg(test)] mod tests; pub use crate::cache::QueryCache; use crate::metrics::{ log_external_deps_package, log_source_package_size_bytes_total, }; pub struct ConfigMetadataAndSchema { pub config_metadata: ConfigMetadata, pub schema: Option<DatabaseSchema>, } #[derive(Clone)] pub struct ApplyConfigArgs { pub auth_module: Option<ModuleConfig>, pub config_file: ConfigFile, pub schema_id: Option<String>, pub modules: Vec<ModuleConfig>, pub udf_config: UdfConfig, pub source_package: SourcePackage, pub analyze_results: BTreeMap<CanonicalizedModulePath, AnalyzedModule>, } #[derive(Debug)] pub struct QueryReturn { pub result: Result<JsonPackedValue, JsError>, pub log_lines: LogLines, pub token: Token, pub journal: QueryJournal, } #[derive(Debug)] pub struct RedactedQueryReturn { pub result: Result<JsonPackedValue, RedactedJsError>, pub log_lines: RedactedLogLines, pub token: Token, pub journal: SerializedQueryJournal, } #[derive(Debug)] pub struct MutationReturn { pub value: JsonPackedValue, pub log_lines: LogLines, pub ts: Timestamp, } #[derive(Debug)] pub struct RedactedMutationReturn { pub value: JsonPackedValue, pub log_lines: RedactedLogLines, pub ts: Timestamp, } #[derive(thiserror::Error, Debug)] #[error("Mutation failed: {error}")] pub struct MutationError { pub error: JsError, pub log_lines: LogLines, } #[derive(thiserror::Error, Debug)] #[error("Mutation failed: {error}")] pub struct RedactedMutationError { pub error: RedactedJsError, pub log_lines: RedactedLogLines, } #[derive(Debug)] pub struct ActionReturn { pub value: JsonPackedValue, pub log_lines: LogLines, } #[derive(Debug)] pub struct RedactedActionReturn { pub value: JsonPackedValue, pub log_lines: RedactedLogLines, } #[derive(thiserror::Error, Debug)] #[error("Action failed: {error}")] pub struct ActionError { pub error: JsError, pub log_lines: LogLines, } #[derive(thiserror::Error, Debug)] #[error("Action failed: {error}")] pub struct RedactedActionError { pub error: RedactedJsError, pub log_lines: RedactedLogLines, } #[derive(Debug)] pub struct FunctionReturn { pub value: JsonPackedValue, pub log_lines: RedactedLogLines, } #[derive(thiserror::Error, Debug)] #[error("Function failed: {error}")] pub struct FunctionError { pub error: RedactedJsError, pub log_lines: RedactedLogLines, } // Ordered so that all unsets come before sets #[derive(PartialEq, Eq, PartialOrd, Ord)] pub enum EnvVarChange { Unset(EnvVarName), Set(EnvironmentVariable), } #[derive(Clone)] pub struct ApplicationStorage { pub files_storage: Arc<dyn Storage>, pub modules_storage: Arc<dyn Storage>, search_storage: Arc<dyn Storage>, pub exports_storage: Arc<dyn Storage>, snapshot_imports_storage: Arc<dyn Storage>, } pub struct Application<RT: Runtime> { runtime: RT, database: Database<RT>, runner: Arc<ApplicationFunctionRunner<RT>>, function_log: FunctionExecutionLog<RT>, file_storage: FileStorage<RT>, application_storage: ApplicationStorage, usage_tracking: UsageCounter, key_broker: KeyBroker, instance_name: String, scheduled_job_runner: ScheduledJobRunner, cron_job_executor: Arc<Mutex<Box<dyn SpawnHandle>>>, index_worker: Arc<Mutex<Option<Box<dyn SpawnHandle>>>>, fast_forward_worker: Arc<Mutex<Box<dyn SpawnHandle>>>, search_worker: Arc<Mutex<SearchIndexWorkers>>, search_and_vector_bootstrap_worker: Arc<Mutex<Box<dyn SpawnHandle>>>, table_summary_worker: TableSummaryClient, schema_worker: Arc<Mutex<Box<dyn SpawnHandle>>>, snapshot_import_worker: Arc<Mutex<Option<Box<dyn SpawnHandle>>>>, export_worker: Arc<Mutex<Option<Box<dyn SpawnHandle>>>>, system_table_cleanup_worker: Arc<Mutex<Box<dyn SpawnHandle>>>, migration_worker: Arc<Mutex<Option<Box<dyn SpawnHandle>>>>, log_visibility: Arc<dyn LogVisibility<RT>>, module_cache: ModuleCache<RT>, system_env_var_names: HashSet<EnvVarName>, app_auth: Arc<ApplicationAuth>, log_manager_client: LogManagerClient, } impl<RT: Runtime> Clone for Application<RT> { fn clone(&self) -> Self { Self { runtime: self.runtime.clone(), database: self.database.clone(), runner: self.runner.clone(), function_log: self.function_log.clone(), file_storage: self.file_storage.clone(), application_storage: self.application_storage.clone(), usage_tracking: self.usage_tracking.clone(), key_broker: self.key_broker.clone(), instance_name: self.instance_name.clone(), scheduled_job_runner: self.scheduled_job_runner.clone(), cron_job_executor: self.cron_job_executor.clone(), index_worker: self.index_worker.clone(), fast_forward_worker: self.fast_forward_worker.clone(), search_worker: self.search_worker.clone(), search_and_vector_bootstrap_worker: self.search_and_vector_bootstrap_worker.clone(), table_summary_worker: self.table_summary_worker.clone(), schema_worker: self.schema_worker.clone(), snapshot_import_worker: self.snapshot_import_worker.clone(), export_worker: self.export_worker.clone(), system_table_cleanup_worker: self.system_table_cleanup_worker.clone(), migration_worker: self.migration_worker.clone(), log_visibility: self.log_visibility.clone(), module_cache: self.module_cache.clone(), system_env_var_names: self.system_env_var_names.clone(), app_auth: self.app_auth.clone(), log_manager_client: self.log_manager_client.clone(), } } } /// Create storage based on the storage type configuration pub async fn create_storage<RT: Runtime>( runtime: RT, storage_type: &model::database_globals::types::StorageType, use_case: StorageUseCase, ) -> anyhow::Result<Arc<dyn Storage>> { Ok(match storage_type { model::database_globals::types::StorageType::S3 { s3_prefix } => { Arc::new(S3Storage::for_use_case(use_case, s3_prefix.clone(), runtime).await?) }, model::database_globals::types::StorageType::Local { dir } => { let storage = LocalDirStorage::for_use_case(runtime, dir, use_case)?; tracing::info!("{use_case} storage path: {:?}", storage.path()); Arc::new(storage) }, }) } impl<RT: Runtime> Application<RT> { pub async fn initialize_storage( runtime: RT, database: &Database<RT>, storage_tag_initializer: StorageTagInitializer, instance_name: String, ) -> anyhow::Result<ApplicationStorage> { let storage_type = { let mut tx = database.begin_system().await?; let storage_type = DatabaseGlobalsModel::new(&mut tx) .initialize_storage_tag(storage_tag_initializer, instance_name) .await?; database .commit_with_write_source(tx, "init_storage") .await?; storage_type }; let files_storage = create_storage(runtime.clone(), &storage_type, StorageUseCase::Files).await?; let modules_storage = create_storage(runtime.clone(), &storage_type, StorageUseCase::Modules).await?; let search_storage = create_storage( runtime.clone(), &storage_type, StorageUseCase::SearchIndexes, ) .await?; let exports_storage = create_storage(runtime.clone(), &storage_type, StorageUseCase::Exports).await?; let snapshot_imports_storage = create_storage( runtime.clone(), &storage_type, StorageUseCase::SnapshotImports, ) .await?; // Search storage needs to be set for Database to be fully initialized database.set_search_storage(search_storage.clone()); tracing::info!("{:?} storage is configured.", storage_type); Ok(ApplicationStorage { files_storage, modules_storage, search_storage, exports_storage, snapshot_imports_storage, }) } pub async fn new( runtime: RT, database: Database<RT>, file_storage: FileStorage<RT>, application_storage: ApplicationStorage, usage_tracking: UsageCounter, key_broker: KeyBroker, instance_name: String, function_runner: Arc<dyn FunctionRunner<RT>>, convex_origin: ConvexOrigin, convex_site: ConvexSite, searcher: Arc<dyn Searcher>, segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>, persistence: Arc<dyn Persistence>, node_actions: Actions<RT>, log_visibility: Arc<dyn LogVisibility<RT>>, app_auth: Arc<ApplicationAuth>, cache: QueryCache, fetch_client: Arc<dyn FetchClient>, local_log_sink: Option<String>, lease_lost_shutdown: ShutdownSignal, export_provider: Arc<dyn ExportProvider<RT>>, ) -> anyhow::Result<Self> { let module_cache = ModuleCache::new(runtime.clone(), application_storage.modules_storage.clone()).await; let module_loader = Arc::new(module_cache.clone()); let default_system_env_vars = btreemap! { CONVEX_ORIGIN.clone() => convex_origin.parse()?, CONVEX_SITE.clone() => convex_site.parse()? }; let mut index_worker = Arc::new(Mutex::new(None)); if *ENABLE_INDEX_BACKFILL { let index_worker_fut = IndexWorker::new( runtime.clone(), persistence.clone(), database.retention_validator(), database.clone(), ); index_worker = Arc::new(Mutex::new(Some( runtime.spawn("index_worker", index_worker_fut), ))); }; let fast_forward_worker = FastForwardIndexWorker::create_and_start(runtime.clone(), database.clone()); let fast_forward_worker = Arc::new(Mutex::new( runtime.spawn("fast_forward_worker", fast_forward_worker), )); let search_worker = SearchIndexWorkers::create_and_start( runtime.clone(), database.clone(), persistence.reader(), application_storage.search_storage.clone(), searcher, segment_term_metadata_fetcher, ); let search_worker = Arc::new(Mutex::new(search_worker)); let search_and_vector_bootstrap_worker = Arc::new(Mutex::new(database.start_search_and_vector_bootstrap())); let table_summary_worker = TableSummaryWorker::start( runtime.clone(), database.clone(), persistence.clone(), lease_lost_shutdown, ); let schema_worker = Arc::new(Mutex::new(runtime.spawn( "schema_worker", SchemaWorker::start(runtime.clone(), database.clone()), ))); let system_table_cleanup_worker = SystemTableCleanupWorker::new( runtime.clone(), database.clone(), application_storage.exports_storage.clone(), ); let system_table_cleanup_worker = Arc::new(Mutex::new( runtime.spawn("system_table_cleanup_worker", system_table_cleanup_worker), )); // If local_log_sink is passed in, this is a local instance, so we enable log // streaming by default. Otherwise, it's hard to grant the // entitlement in testing and in load generator. If not local, we // read the entitlement from the database. let mut tx = database.begin(Identity::system()).await?; let log_streaming_allowed = if let Some(path) = local_log_sink { add_local_log_sink_on_startup(database.clone(), path).await?; true } else { let mut bi = BackendInfoModel::new(&mut tx); bi.is_log_streaming_allowed().await? }; let log_manager_client = LogManager::start( runtime.clone(), database.clone(), fetch_client.clone(), instance_name.clone(), log_streaming_allowed, ) .await; let function_log = FunctionExecutionLog::new( runtime.clone(), database.usage_counter(), Arc::new(log_manager_client.clone()), ); let runner = Arc::new(ApplicationFunctionRunner::new( runtime.clone(), database.clone(), key_broker.clone(), function_runner.clone(), node_actions, file_storage.transactional_file_storage.clone(), application_storage.modules_storage.clone(), module_loader, function_log.clone(), default_system_env_vars.clone(), cache, )); function_runner.set_action_callbacks(runner.clone()); let scheduled_job_runner = ScheduledJobRunner::start( runtime.clone(), instance_name.clone(), database.clone(), runner.clone(), function_log.clone(), ); let cron_job_executor_fut = CronJobExecutor::run( runtime.clone(), instance_name.clone(), database.clone(), runner.clone(), function_log.clone(), ); let cron_job_executor = Arc::new(Mutex::new( runtime.spawn("cron_job_executor", cron_job_executor_fut), )); let export_worker = ExportWorker::new( runtime.clone(), database.clone(), application_storage.exports_storage.clone(), application_storage.files_storage.clone(), export_provider, database.usage_counter(), instance_name.clone(), ); let export_worker = Arc::new(Mutex::new(Some( runtime.spawn("export_worker", export_worker), ))); let snapshot_import_worker = SnapshotImportWorker::start( runtime.clone(), database.clone(), application_storage.snapshot_imports_storage.clone(), file_storage.clone(), database.usage_counter(), ); let snapshot_import_worker = Arc::new(Mutex::new(Some( runtime.spawn("snapshot_import_worker", snapshot_import_worker), ))); let migration_worker = MigrationWorker::new( runtime.clone(), persistence.clone(), database.clone(), application_storage.modules_storage.clone(), ); let migration_worker = Arc::new(Mutex::new(Some( runtime.spawn("migration_worker", migration_worker.go()), ))); Ok(Self { runtime, database, runner, function_log, file_storage, application_storage, usage_tracking, key_broker, scheduled_job_runner, cron_job_executor, instance_name, index_worker, fast_forward_worker, search_worker, search_and_vector_bootstrap_worker, table_summary_worker, schema_worker, export_worker, snapshot_import_worker, system_table_cleanup_worker, migration_worker, log_visibility, module_cache, system_env_var_names: default_system_env_vars.into_keys().collect(), app_auth, log_manager_client, }) } pub fn runtime(&self) -> RT { self.runtime.clone() } pub fn modules_storage(&self) -> &Arc<dyn Storage> { &self.application_storage.modules_storage } pub fn modules_cache(&self) -> &ModuleCache<RT> { &self.module_cache } pub fn key_broker(&self) -> &KeyBroker { &self.key_broker } pub fn runner(&self) -> Arc<ApplicationFunctionRunner<RT>> { self.runner.clone() } pub fn function_log(&self) -> FunctionExecutionLog<RT> { self.function_log.clone() } pub fn log_manager_client(&self) -> &LogManagerClient { &self.log_manager_client } pub fn now_ts_for_reads(&self) -> RepeatableTimestamp { self.database.now_ts_for_reads() } pub fn instance_name(&self) -> String { self.instance_name.clone() } #[fastrace::trace] pub async fn begin(&self, identity: Identity) -> anyhow::Result<Transaction<RT>> { self.database.begin(identity).await } #[cfg(any(test, feature = "testing"))] pub async fn commit_test(&self, transaction: Transaction<RT>) -> anyhow::Result<Timestamp> { self.commit(transaction, "test").await } #[fastrace::trace] pub async fn commit( &self, transaction: Transaction<RT>, write_source: impl Into<WriteSource>, ) -> anyhow::Result<Timestamp> { self.database .commit_with_write_source(transaction, write_source) .await } #[fastrace::trace] pub async fn subscribe(&self, token: Token) -> anyhow::Result<Subscription> { self.database.subscribe(token).await } pub fn usage_counter(&self) -> UsageCounter { self.database.usage_counter() } pub fn snapshot(&self, ts: RepeatableTimestamp) -> anyhow::Result<Snapshot> { self.database.snapshot(ts) } pub fn latest_snapshot(&self) -> anyhow::Result<Snapshot> { self.database.latest_snapshot() } pub fn app_auth(&self) -> &Arc<ApplicationAuth> { &self.app_auth } pub async fn search_with_compiled_query( &self, index_id: IndexId, printable_index_name: IndexName, query: pb::searchlight::TextQuery, pending_updates: Vec<DocumentUpdate>, ts: RepeatableTimestamp, ) -> anyhow::Result<RevisionWithKeys> { self.database .search_with_compiled_query(index_id, printable_index_name, query, pending_updates, ts) .await } pub async fn vector_search( &self, identity: Identity, query: VectorSearch, ) -> anyhow::Result<(Vec<PublicVectorSearchQueryResult>, FunctionUsageStats)> { self.database.vector_search(identity, query).await } pub async fn get_source_code( &self, identity: Identity, path: ModulePath, component: ComponentId, ) -> anyhow::Result<Option<String>> { let mut tx = self.begin(identity).await?; let path = CanonicalizedComponentModulePath { component, module_path: path.canonicalize(), }; let Some(metadata) = ModuleModel::new(&mut tx).get_metadata(path.clone()).await? else { return Ok(None); }; let Some(analyze_result) = &metadata.analyze_result else { return Ok(None); }; let Some(source_index) = analyze_result.source_index else { return Ok(None); }; let Some(full_source) = self.module_cache.get_module(&mut tx, path).await? else { return Ok(None); }; let Some(source_map_str) = &full_source.source_map else { return Ok(None); }; let Some(source_map) = source_map_from_slice(source_map_str.as_bytes()) else { return Ok(None); }; let Some(source_map_content) = source_map.get_source_contents(source_index) else { return Ok(None); }; Ok(Some(source_map_content.to_owned())) } pub async fn storage_generate_upload_url( &self, identity: Identity, component: ComponentId, ) -> anyhow::Result<String> { let issued_ts = self.runtime().unix_timestamp(); let mut tx = self.begin(identity).await?; let url = self .file_storage .transactional_file_storage .generate_upload_url(&mut tx, self.key_broker(), issued_ts, component) .await?; Ok(url) } pub async fn read_only_udf( &self, request_id: RequestId, path: PublicFunctionPath, args: Vec<JsonValue>, identity: Identity, caller: FunctionCaller, ) -> anyhow::Result<RedactedQueryReturn> { let ts = *self.now_ts_for_reads(); self.read_only_udf_at_ts(request_id, path, args, identity, ts, None, caller) .await } #[fastrace::trace] pub async fn read_only_udf_at_ts( &self, request_id: RequestId, path: PublicFunctionPath, args: Vec<JsonValue>, identity: Identity, ts: Timestamp, journal: Option<Option<String>>, caller: FunctionCaller, ) -> anyhow::Result<RedactedQueryReturn> { let persistence_version = self.database.persistence_version(); let block_logging = self .log_visibility .should_redact_logs_and_error( &mut self.begin(identity.clone()).await?, identity.clone(), caller.allowed_visibility(), ) .await?; let query_return: anyhow::Result<_> = try { let journal = journal .map(|serialized_journal| { self.key_broker .decrypt_query_journal(serialized_journal, persistence_version) }) .transpose()?; self.runner .run_query_at_ts( request_id.clone(), path, args, identity, ts, journal, caller, ) .await? }; let redacted_query_return = match query_return { Ok(query_return) => RedactedQueryReturn { result: match query_return.result { Ok(r) => Ok(r), Err(e) => Err(RedactedJsError::from_js_error(e, block_logging, request_id)), }, log_lines: RedactedLogLines::from_log_lines(query_return.log_lines, block_logging), token: query_return.token, journal: self .key_broker .encrypt_query_journal(&query_return.journal, persistence_version), }, Err(e) if e.is_deterministic_user_error() => RedactedQueryReturn { result: Err(RedactedJsError::from_js_error( JsError::from_error(e), block_logging, request_id, )), log_lines: RedactedLogLines::empty(), // Create a token for an empty read set because we haven't // done any reads yet. token: Token::empty(ts), journal: self .key_broker .encrypt_query_journal(&QueryJournal::new(), persistence_version), }, Err(e) => anyhow::bail!(e), }; Ok(redacted_query_return) } #[fastrace::trace] pub async fn mutation_udf( &self, request_id: RequestId, path: PublicFunctionPath, args: Vec<JsonValue>, identity: Identity, // Identifier used to make this mutation idempotent. mutation_identifier: Option<SessionRequestIdentifier>, caller: FunctionCaller, mutation_queue_length: Option<usize>, ) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>> { identity.ensure_can_run_function(UdfType::Mutation)?; let block_logging = self .log_visibility .should_redact_logs_and_error( &mut self.begin(identity.clone()).await?, identity.clone(), caller.allowed_visibility(), ) .await?; let result = match self .runner .retry_mutation( request_id.clone(), path, args, identity, mutation_identifier, caller, mutation_queue_length, ) .await { Ok(Ok(mutation_return)) => Ok(RedactedMutationReturn { value: mutation_return.value, log_lines: RedactedLogLines::from_log_lines( mutation_return.log_lines, block_logging, ), ts: mutation_return.ts, }), Ok(Err(mutation_error)) => Err(RedactedMutationError { error: RedactedJsError::from_js_error( mutation_error.error, block_logging, request_id, ), log_lines: RedactedLogLines::from_log_lines( mutation_error.log_lines, block_logging, ), }), Err(e) if e.is_deterministic_user_error() => Err(RedactedMutationError { error: RedactedJsError::from_js_error( JsError::from_error(e), block_logging, request_id, ), log_lines: RedactedLogLines::empty(), }), Err(e) => anyhow::bail!(e), }; Ok(result) } #[fastrace::trace] pub async fn action_udf( &self, request_id: RequestId, name: PublicFunctionPath, args: Vec<JsonValue>, identity: Identity, caller: FunctionCaller, ) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>> { identity.ensure_can_run_function(UdfType::Action)?; let block_logging = self .log_visibility .should_redact_logs_and_error( &mut self.begin(identity.clone()).await?, identity.clone(), caller.allowed_visibility(), ) .await?; let should_spawn = caller.run_until_completion_if_cancelled(); let runner: Arc<ApplicationFunctionRunner<RT>> = self.runner.clone(); let request_id_ = request_id.clone(); let span = SpanContext::current_local_parent() .map(|ctx| Span::root(format!("{}::actions_future", func_path!()), ctx)) .unwrap_or(Span::noop()); let run_action = async move { runner .run_action(request_id_, name, args, identity, caller) .in_span(span) .await }; let result = if should_spawn { // Spawn running the action in a separate future. This way, even if we // get cancelled, it will continue to run to completion. let (tx, rx) = oneshot::channel(); // TODO: cancel this handle with the application self.runtime.spawn_background("run_action", async move { let result = run_action.await; // Don't log errors if the caller has gone away. _ = tx.send(result); }); rx.await .context("run_action one shot sender dropped prematurely?")? } else { // Await the action future. This means if we get cancelled the action // future will get dropped. run_action.await }; let result = match result { Ok(Ok(action_return)) => Ok(RedactedActionReturn { value: action_return.value, log_lines: RedactedLogLines::from_log_lines(action_return.log_lines, block_logging), }), Ok(Err(action_error)) => Err(RedactedActionError { error: RedactedJsError::from_js_error( action_error.error, block_logging, request_id, ), log_lines: RedactedLogLines::from_log_lines(action_error.log_lines, block_logging), }), Err(e) => anyhow::bail!(e), }; Ok(result) } #[fastrace::trace] pub async fn http_action_udf( &self, request_id: RequestId, http_request: HttpActionRequest, identity: Identity, caller: FunctionCaller, mut response_streamer: HttpActionResponseStreamer, ) -> anyhow::Result<()> { identity.ensure_can_run_function(UdfType::HttpAction)?; let block_logging = self .log_visibility .should_redact_logs_and_error( &mut self.begin(identity.clone()).await?, identity.clone(), caller.allowed_visibility(), ) .await?; let rt_ = self.runtime.clone(); // Spawn running the action in a separate future. This way, even if we // get cancelled, it will continue to run to completion. let (tx, rx) = oneshot::channel(); let runner = self.runner.clone(); let span = SpanContext::current_local_parent() .map(|ctx| Span::root(format!("{}::http_actions_future", func_path!()), ctx)) .unwrap_or(Span::noop()); let response_streamer_ = response_streamer.clone(); // TODO: cancel this handle with the application self.runtime .spawn_background("run_http_action", async move { let result = runner .run_http_action( request_id, http_request, response_streamer_, identity, caller, ) .in_span(span) .await; if let Err(Err(mut e)) = tx.send(result) { // If the caller has gone away, and the result is a system error, // log to sentry. report_error(&mut e).await; } rt_.pause_client().wait("end_run_http_action").await; }); let result = rx .await .context("run_http_action one shot sender dropped prematurely?")?; match result { Ok(HttpActionResult::Error(error)) => { let response_parts = RedactedJsError::from_js_error(error, block_logging, RequestId::new()) .to_http_response_parts(); for part in response_parts { response_streamer.send_part(part)??; } }, Ok(HttpActionResult::Streamed) => (), Err(e) => anyhow::bail!(e), }; Ok(()) } /// Run a function of an arbitrary type from its name pub async fn any_udf( &self, request_id: RequestId, path: CanonicalizedComponentFunctionPath, args: Vec<JsonValue>, identity: Identity, caller: FunctionCaller, ) -> anyhow::Result<Result<FunctionReturn, FunctionError>> { let block_logging = self .log_visibility .should_redact_logs_and_error( &mut self.begin(identity.clone()).await?, identity.clone(), caller.allowed_visibility(), ) .await?; // We use a separate transaction to get the type of the UDF before calling the // appropriate type-specific code. While this could lead to incorrect // "function not found" messages errors if the user changes the type of the // UDF between the two transactions without deleting it, this situation is // rare enough to disregard it. let mut tx_type = self.begin(identity.clone()).await?; let canonicalized_path = path.clone(); let Some(analyzed_function) = ModuleModel::new(&mut tx_type) .get_analyzed_function(&canonicalized_path) .await? .ok() .filter(|af| { (identity.is_admin() || af.visibility == Some(Visibility::Public)) && af.udf_type != UdfType::HttpAction }) else { let missing_or_internal = format!( "Could not find function for '{}'{}. Did you forget to run `npx convex dev` or \ `npx convex deploy`?", String::from(canonicalized_path.udf_path.strip()), canonicalized_path.component.in_component_str(), ); return Ok(Err(FunctionError { error: RedactedJsError::from_js_error( JsError::from_message(missing_or_internal), block_logging, request_id, ), log_lines: RedactedLogLines::empty(), })); }; identity.ensure_can_run_function(analyzed_function.udf_type)?; match analyzed_function.udf_type { UdfType::Query => self .read_only_udf( request_id, PublicFunctionPath::Component(path), args, identity, caller, ) .await .map( |RedactedQueryReturn { result, log_lines, .. }| { match result { Ok(value) => Ok(FunctionReturn { value, log_lines }), Err(error) => Err(FunctionError { error, log_lines }), } }, ), UdfType::Mutation => self .mutation_udf( request_id, PublicFunctionPath::Component(path), args, identity, None, caller, None, ) .await .map(|res| { res.map( |RedactedMutationReturn { value, log_lines, .. }| FunctionReturn { value, log_lines }, ) .map_err( |RedactedMutationError { error, log_lines, .. }| FunctionError { error, log_lines }, ) }), UdfType::Action => self .action_udf( request_id, PublicFunctionPath::Component(path), args, identity, caller, ) .await .map(|res| { res.map( |RedactedActionReturn { value, log_lines, .. }| FunctionReturn { value, log_lines }, ) .map_err( |RedactedActionError { error, log_lines, .. }| FunctionError { error, log_lines }, ) }), UdfType::HttpAction => { anyhow::bail!( "HTTP actions not supported in the /functions endpoint. A \"not found\" \ message should be returned instead." ) }, } } pub async fn request_export( &self, identity: Identity, format: ExportFormat, component: ComponentId, requestor: ExportRequestor, expiration_ts_ns: Option<u64>, ) -> anyhow::Result<DeveloperDocumentId> { anyhow::ensure!( identity.is_admin() || identity.is_system(), unauthorized_error("request_export") ); if let Some(expiration_ts_ns) = expiration_ts_ns { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .context("Time went backward")?; anyhow::ensure!( expiration_ts_ns >= now.as_nanos() as u64, ErrorMetadata::bad_request( "SnapshotExpirationInPast", "Snapshot expiration in past." ) ); let how_far = Duration::from_nanos(expiration_ts_ns) - now; anyhow::ensure!( how_far <= Duration::from_days(60), ErrorMetadata::bad_request( "SnapshotExpirationTooLarge", format!( "Snapshot expiration is {} days in the future. Must be <= 60", how_far.as_secs() / (60 * 60 * 24) ), ) ); } let mut tx = self.begin(identity).await?; let mut exports_model = ExportsModel::new(&mut tx); let export_requested = exports_model.latest_requested().await?; let export_in_progress = exports_model.latest_in_progress().await?; let snapshot_id = match (export_requested, export_in_progress) { (None, None) => { exports_model .insert_requested(format, component, requestor, expiration_ts_ns) .await }, _ => Err( anyhow::anyhow!("Can only have one export requested or in progress at once") .context(ErrorMetadata::bad_request( "ExportInProgress", "There is already an export requested or in progress.", )), ), }?; self.commit(tx, "request_export").await?; Ok(snapshot_id.into()) } pub async fn get_zip_export( &self, identity: Identity, id: Either<DeveloperDocumentId, Timestamp>, ) -> anyhow::Result<(StorageGetStream, String)> { let (object_key, snapshot_ts) = { let mut tx = self.begin(identity).await?; let export = match id { Either::Left(id) => ExportsModel::new(&mut tx).get(id).await?, Either::Right(ts) => { ExportsModel::new(&mut tx) .completed_export_at_ts(ts) .await? }, } .context(ErrorMetadata::not_found( "ExportNotFound", format!("The requested export {id} was not found"), ))?; match export.into_value() { Export::Completed { zip_object_key, start_ts, .. } => (zip_object_key, start_ts), Export::Failed { .. } | Export::Canceled { .. } | Export::InProgress { .. } | Export::Requested { .. } => { anyhow::bail!(ErrorMetadata::bad_request( "ExportNotComplete", format!("The requested export {id} has not completed"), )) }, } }; let storage_get_stream = self .application_storage .exports_storage .get(&object_key) .await? .context(ErrorMetadata::not_found( "ExportNotFound", format!("The requested export {snapshot_ts}/{object_key:?} was not found"), ))?; let filename = format!( // This should match the format in SnapshotExport.tsx. "snapshot_{}_{snapshot_ts}.zip", self.instance_name ); Ok((storage_get_stream, filename)) } /// Returns the cloud export key - fully qualified to the instance. pub fn cloud_export_key(&self, zip_export_key: ObjectKey) -> FullyQualifiedObjectKey { self.application_storage .exports_storage .fully_qualified_key(&zip_export_key) } pub async fn update_environment_variables( &self, tx: &mut Transaction<RT>, changes: Vec<EnvVarChange>, ) -> anyhow::Result<Vec<DeploymentAuditLogEvent>> { let mut audit_events = vec![]; let mut model = EnvironmentVariablesModel::new(tx); for change in changes { match change { EnvVarChange::Set(env_var) => { let name = env_var.name(); if let Some(_existing) = model.delete(name).await? { audit_events.push(DeploymentAuditLogEvent::UpdateEnvironmentVariable { name: name.clone(), }); } else { audit_events.push(DeploymentAuditLogEvent::CreateEnvironmentVariable { name: name.clone(), }); } model.create(env_var, &self.system_env_var_names).await?; }, EnvVarChange::Unset(name) => { if let Some(_existing) = model.delete(&name).await? { audit_events .push(DeploymentAuditLogEvent::DeleteEnvironmentVariable { name }); }; }, } } let all_env_vars = model.get_all().await?; anyhow::ensure!( all_env_vars.len() as u64 <= (ENV_VAR_LIMIT as u64), env_var_limit_met(), ); Self::reevaluate_existing_auth_config(self.runner().clone(), tx).await?; Ok(audit_events) } pub async fn create_environment_variables( &self, tx: &mut Transaction<RT>, environment_variables: Vec<EnvironmentVariable>, ) -> anyhow::Result<Vec<DeploymentAuditLogEvent>> { let all_env_vars = EnvironmentVariablesModel::new(tx).get_all().await?; anyhow::ensure!( environment_variables.len() as u64 + all_env_vars.len() as u64 <= (ENV_VAR_LIMIT as u64), env_var_limit_met(), ); for environment_variable in environment_variables.clone() { self.create_one_environment_variable(tx, environment_variable) .await?; } let audit_events = environment_variables .into_iter() .map( |env_variable| DeploymentAuditLogEvent::CreateEnvironmentVariable { name: env_variable.name().to_owned(), }, ) .collect(); Ok(audit_events) } async fn create_one_environment_variable( &self, tx: &mut Transaction<RT>, environment_variable: EnvironmentVariable, ) -> anyhow::Result<()> { let mut env_var_model = EnvironmentVariablesModel::new(tx); if env_var_model .get(environment_variable.name()) .await? .is_some() { anyhow::bail!(env_var_name_not_unique(None)); } env_var_model .create(environment_variable, &self.system_env_var_names) .await?; Ok(()) } pub async fn set_initial_environment_variables( &self, environment_variables: Vec<EnvironmentVariable>, identity: Identity, ) -> anyhow::Result<()> { let mut tx = self.begin(identity).await?; if !EnvironmentVariablesModel::new(&mut tx) .get_all() .await? .is_empty() { // This deployment already has environment variables, so don't try to initialize // them again return Ok(()); } match self .create_environment_variables(&mut tx, environment_variables) .await { Ok(audit_events) => { self.commit_with_audit_log_events(tx, audit_events, "set_initial_env_vars") .await?; Ok(()) }, Err(e) => { if e.is_bad_request() { // This should not happen and likely means we have a bug in what we allow as // project default env variables. Report the error but do not fail the request. report_error(&mut anyhow::anyhow!( "Error setting initial environment variables: {e}" )) .await; Ok(()) } else { Err(e) } }, } } pub async fn delete_environment_variable( &self, tx: &mut Transaction<RT>, id: ResolvedDocumentId, ) -> anyhow::Result<DeploymentAuditLogEvent> { let mut model = EnvironmentVariablesModel::new(tx); let Some(env_var) = model.get_by_id_legacy(id).await? else { anyhow::bail!(ErrorMetadata::bad_request( "EnvironmentVariableNotFound", "Environment variable not found" )); }; let name = env_var.name().to_owned(); model.delete(&name).await?; Ok(DeploymentAuditLogEvent::DeleteEnvironmentVariable { name }) } pub async fn set_canonical_url( &self, tx: &mut Transaction<RT>, canonical_url: CanonicalUrl, ) -> anyhow::Result<()> { CanonicalUrlsModel::new(tx) .set_canonical_url(canonical_url.request_destination, canonical_url.url) .await?; Self::reevaluate_existing_auth_config(self.runner().clone(), tx).await } pub async fn unset_canonical_url( &self, tx: &mut Transaction<RT>, request_destination: RequestDestination, ) -> anyhow::Result<()> { CanonicalUrlsModel::new(tx) .unset_canonical_url(request_destination) .await?; Self::reevaluate_existing_auth_config(self.runner().clone(), tx).await } #[fastrace::trace] pub async fn analyze( &self, udf_config: UdfConfig, new_modules: Vec<ModuleConfig>, source_package: SourcePackage, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_var_overrides: BTreeMap<EnvVarName, EnvVarValue>, ) -> anyhow::Result<Result<BTreeMap<CanonicalizedModulePath, AnalyzedModule>, JsError>> { self.runner .analyze( udf_config, new_modules, source_package, user_environment_variables, system_env_var_overrides, ) .await } fn _validate_user_defined_index_fields( &self, fields: IndexedFields, ) -> anyhow::Result<IndexedFields> { // Creation time is a special case of a system field. We check that // first to provide a more useful error message. anyhow::ensure!( !fields.contains(&CREATION_TIME_FIELD_PATH), index_validation_error::fields_contain_creation_time(), ); // We do not allow system fields in user defined indexes. anyhow::ensure!( fields .iter() .flat_map(|fp| fp.fields()) .all(|f| !f.is_system()), index_validation_error::field_name_reserved() ); // Append _creationTime to the end of each index. This is so indexes have // default order that is more intuitive to the user. let mut fields: Vec<FieldPath> = fields.into(); fields.push(CREATION_TIME_FIELD_PATH.clone()); fields.try_into() } #[fastrace::trace] pub async fn evaluate_schema(&self, schema: ModuleConfig) -> anyhow::Result<DatabaseSchema> { self._evaluate_schema(schema).await.map_err(|e| { e.wrap_error_message(|msg| format!("Hit an error while evaluating your schema:\n{msg}")) }) } #[fastrace::trace] async fn _evaluate_schema(&self, schema: ModuleConfig) -> anyhow::Result<DatabaseSchema> { let rng_seed = self.runtime().rng().random(); let unix_timestamp = self.runtime().unix_timestamp(); let mut schema = self .runner() .evaluate_schema(schema.source, schema.source_map, rng_seed, unix_timestamp) .await?; for table_schema in schema.tables.values_mut() { for index_schema in table_schema .indexes .values_mut() .chain(table_schema.staged_db_indexes.values_mut()) { index_schema.fields = self._validate_user_defined_index_fields(index_schema.fields.clone())?; } } schema.check_index_references()?; Ok(schema) } #[fastrace::trace] pub async fn get_evaluated_auth_config( runner: Arc<ApplicationFunctionRunner<RT>>, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_var_overrides: BTreeMap<EnvVarName, EnvVarValue>, auth_config_module: Option<ModuleConfig>, config: &ConfigFile, ) -> anyhow::Result<Vec<AuthInfo>> { if let Some(auth_config_module) = auth_config_module { anyhow::ensure!( config.auth_info.is_none(), ErrorMetadata::bad_request( "InvalidAuthConfig", "Cannot set auth config in both auth config file and `convex.json`, remove it \ from `convex.json`" ) ); anyhow::ensure!( auth_config_module.environment != ModuleEnvironment::Node, "auth config file can't be analyzed in Node.js!" ); let auth_config = Self::evaluate_auth_config( runner, user_environment_variables, system_env_var_overrides, auth_config_module, "The pushed auth config is invalid", ) .await?; Ok(auth_config.providers) } else { config .auth_info .clone() .unwrap_or_default() .into_iter() .map(AuthInfo::try_from) .collect::<Result<Vec<_>, _>>() } } // This is only relevant to auth config set via `auth.config.js`. // Because legacy setups didn't use `auth.config.js` we do not // reset the auth config if `auth.config.js` is not present. pub async fn reevaluate_existing_auth_config( runner: Arc<ApplicationFunctionRunner<RT>>, tx: &mut Transaction<RT>, ) -> anyhow::Result<()> { let path = CanonicalizedComponentModulePath { component: ComponentId::Root, module_path: AUTH_CONFIG_FILE_NAME.parse()?, }; let auth_config_metadata = ModuleModel::new(tx).get_metadata(path.clone()).await?; if let Some(auth_config_metadata) = auth_config_metadata { let environment = auth_config_metadata.environment; let auth_config_source = runner .module_cache .get_module(tx, path) .await? .context("Module has metadata but no source")?; let auth_config_module = ModuleConfig { path: AUTH_CONFIG_FILE_NAME.parse()?, source: auth_config_source.source.clone(), source_map: auth_config_source.source_map.clone(), environment, }; let user_environment_variables = EnvironmentVariablesModel::new(tx).get_all().await?; let auth_config = Self::evaluate_auth_config( runner, user_environment_variables, system_env_var_overrides(tx).await?, auth_config_module, "This change would make the auth config invalid", ) .await?; AuthInfoModel::new(tx).put(auth_config.providers).await?; } Ok(()) } async fn evaluate_auth_config( runner: Arc<ApplicationFunctionRunner<RT>>, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_var_overrides: BTreeMap<EnvVarName, EnvVarValue>, auth_config_module: ModuleConfig, explanation: &str, ) -> anyhow::Result<AuthConfig> { runner .evaluate_auth_config( auth_config_module.source, auth_config_module.source_map, user_environment_variables, system_env_var_overrides, explanation, ) .await } #[fastrace::trace] pub async fn apply_config_with_retries( &self, identity: Identity, apply_config_args: ApplyConfigArgs, ) -> anyhow::Result<(ConfigMetadataAndSchema, OccRetryStats)> { let runner = self.runner.clone(); self.execute_with_audit_log_events_and_occ_retries_reporting_stats( identity, "apply_config", |tx| Self::_apply_config(runner.clone(), tx, apply_config_args.clone()).into(), ) .await } #[fastrace::trace] async fn _apply_config( runner: Arc<ApplicationFunctionRunner<RT>>, tx: &mut Transaction<RT>, ApplyConfigArgs { auth_module, config_file, schema_id, modules, udf_config, source_package, analyze_results, }: ApplyConfigArgs, ) -> anyhow::Result<(ConfigMetadataAndSchema, Vec<DeploymentAuditLogEvent>)> { let schema_id = schema_id .map(|schema_id| { parse_schema_id( &schema_id, tx.table_mapping(), TableNamespace::root_component(), ) .context(invalid_schema_id(&schema_id)) }) .transpose()?; let user_environment_variables = EnvironmentVariablesModel::new(tx).get_all().await?; let system_env_var_overrides = system_env_var_overrides(tx).await?; let auth_providers = Self::get_evaluated_auth_config( runner, user_environment_variables, system_env_var_overrides, auth_module, &config_file, ) .await?; let config_metadata = ConfigMetadata::from_file(config_file, auth_providers); let (config_diff, schema) = ConfigModel::new(tx, ComponentId::Root) .apply( config_metadata.clone(), modules, udf_config, Some(source_package), analyze_results, schema_id, ) .await?; ComponentConfigModel::new(tx).disable_components().await?; Ok(( ConfigMetadataAndSchema { config_metadata, schema, }, vec![DeploymentAuditLogEvent::PushConfig { config_diff }], )) } #[fastrace::trace] pub async fn analyze_modules_with_auth_config( &self, udf_config: UdfConfig, modules: Vec<ModuleConfig>, source_package: SourcePackage, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_var_overrides: BTreeMap<EnvVarName, EnvVarValue>, ) -> anyhow::Result<( Option<ModuleConfig>, BTreeMap<CanonicalizedModulePath, AnalyzedModule>, )> { // Don't analyze the auth config module let (auth_modules, analyzed_modules): (Vec<_>, Vec<_>) = modules.into_iter().partition(|module| { module.path.clone().canonicalize() == AUTH_CONFIG_FILE_NAME.parse().unwrap() }); let auth_module = auth_modules.first(); let mut analyze_result = self .analyze_modules( udf_config, analyzed_modules, source_package, user_environment_variables, system_env_var_overrides, ) .await?; // Add an empty analyzed result for the auth config module if let Some(auth_module) = auth_module { analyze_result.insert( auth_module.path.clone().canonicalize(), AnalyzedModule::default(), ); } Ok((auth_module.cloned(), analyze_result)) } async fn upload_packages( &self, config: &ProjectConfig, ) -> anyhow::Result<( Option<ExternalDepsPackageId>, BTreeMap<ComponentDefinitionPath, SourcePackage>, )> { let upload_limit = Arc::new(Semaphore::new(*APPLICATION_MAX_CONCURRENT_UPLOADS)); let root_future = async { let permit = upload_limit.acquire().await?; let external_deps_id_and_pkg = if !config.node_dependencies.is_empty() { let deps = self .build_external_node_deps(config.node_dependencies.clone()) .await?; Some(deps) } else { None }; let app_modules = config.app_definition.modules().cloned().collect(); let app_pkg = self .upload_package( &app_modules, external_deps_id_and_pkg.clone(), config.node_version, ) .await?; drop(permit); Ok((external_deps_id_and_pkg, app_pkg)) }; let mut component_pkg_futures = JoinSet::new(); for component_def in &config.component_definitions { let app = self.clone(); let definition_path = component_def.definition_path.clone(); let component_modules = component_def.modules().cloned().collect(); let upload_limit = upload_limit.clone(); let component_pkg_future = async move { let permit = upload_limit.acquire().await?; let component_pkg = app.upload_package(&component_modules, None, None).await?; drop(permit); anyhow::Ok((definition_path, component_pkg)) }; component_pkg_futures.spawn("upload_package", component_pkg_future); } // `JoinSet::join_all` was added in tokio 1.40.0. let component_pkg_future = async { let mut result = Vec::with_capacity(config.component_definitions.len()); while let Some(component_pkg) = component_pkg_futures.join_next().await { result.push(component_pkg??); } anyhow::Ok(result) }; let ((external_deps, app_pkg), component_pkgs) = tokio::try_join!(root_future, component_pkg_future)?; let mut total_size = PackageSize::default(); if let Some((_, ref pkg)) = external_deps { total_size += pkg.package_size; } total_size += app_pkg.package_size; for (_, pkg) in &component_pkgs { total_size += pkg.package_size; } total_size.verify_size()?; let mut component_definition_packages = BTreeMap::new(); component_definition_packages.insert(ComponentDefinitionPath::root(), app_pkg); for (definition_path, component_pkg) in component_pkgs { anyhow::ensure!(component_definition_packages .insert(definition_path, component_pkg) .is_none()); } let external_deps_id = external_deps.map(|(id, _)| id); Ok((external_deps_id, component_definition_packages)) } // Helper method to call analyze and throw appropriate HttpError. #[fastrace::trace] pub async fn analyze_modules( &self, udf_config: UdfConfig, modules: Vec<ModuleConfig>, source_package: SourcePackage, user_environment_variables: BTreeMap<EnvVarName, EnvVarValue>, system_env_var_overrides: BTreeMap<EnvVarName, EnvVarValue>, ) -> anyhow::Result<BTreeMap<CanonicalizedModulePath, AnalyzedModule>> { let num_dep_modules = modules.iter().filter(|m| m.path.is_deps()).count(); anyhow::ensure!( modules.len() - num_dep_modules <= *MAX_USER_MODULES, ErrorMetadata::bad_request( "InvalidModules", format!( r#"Too many function files ({} > maximum {}) in "convex/". See our docs (https://docs.convex.dev/using/writing-convex-functions#using-libraries) for more details."#, modules.len() - num_dep_modules, *MAX_USER_MODULES ), ) ); // We exclude dependency modules from the user limit since they don't depend on // the developer. We don't expect dependencies to be more than the user defined // modules though. If we ever have crazy amount of dependency modules, // throw a system errors so we can debug. anyhow::ensure!( modules.len() <= 2 * *MAX_USER_MODULES, "Too many dependencies modules! Dependencies: {}, Total modules: {}", num_dep_modules, modules.len() ); // Run analyze the modules to make sure they are valid. match self .analyze( udf_config, modules, source_package, user_environment_variables, system_env_var_overrides, ) .await? { Ok(m) => Ok(m), Err(js_error) => { let e = ErrorMetadata::bad_request( "InvalidModules", format!( "Loading the pushed modules encountered the following error:\n{js_error}" ), ); Err(anyhow::anyhow!(js_error).context(e)) }, } } pub async fn start_upload_for_snapshot_import( &self, identity: Identity, ) -> anyhow::Result<ClientDrivenUploadToken> { if !identity.is_admin() { anyhow::bail!(ErrorMetadata::forbidden( "InvalidImport", "Only an admin of the deployment can import" )); } let upload = self .application_storage .snapshot_imports_storage .start_client_driven_upload() .await?; Ok(upload) } pub async fn upload_part_for_snapshot_import( &self, identity: Identity, upload_token: ClientDrivenUploadToken, part_number: u16, part: Bytes, ) -> anyhow::Result<ClientDrivenUploadPartToken> { if !identity.is_admin() { anyhow::bail!(ErrorMetadata::forbidden( "InvalidImport", "Only an admin of the deployment can import" )); } let part_token = self .application_storage .snapshot_imports_storage .upload_part(upload_token, part_number, part) .await?; Ok(part_token) } pub async fn import_finish_upload( &self, identity: Identity, format: ImportFormat, mode: ImportMode, component_path: ComponentPath, upload_token: ClientDrivenUploadToken, part_tokens: Vec<ClientDrivenUploadPartToken>, ) -> anyhow::Result<DeveloperDocumentId> { if !identity.is_admin() { anyhow::bail!(ErrorMetadata::forbidden( "InvalidImport", "Only an admin of the deployment can import" )); } let object_key = self .application_storage .snapshot_imports_storage .finish_client_driven_upload(upload_token, part_tokens) .await?; let fq_key = self .application_storage .snapshot_imports_storage .fully_qualified_key(&object_key); start_stored_import( self, identity, format, mode, component_path, fq_key, ImportRequestor::SnapshotImport, ) .await } pub async fn upload_snapshot_import( &self, body_stream: BoxStream<'_, anyhow::Result<Bytes>>, ) -> anyhow::Result<FullyQualifiedObjectKey> { let mut upload: Box<BufferedUpload> = self .application_storage .snapshot_imports_storage .start_upload() .await?; // unclear why this reassignment is necessary let mut body_stream = body_stream; upload.try_write_parallel(&mut body_stream).await?; drop(body_stream); let object_key = upload.complete().await?; Ok(self .application_storage .snapshot_imports_storage .fully_qualified_key(&object_key)) } #[fastrace::trace] pub async fn upload_package( &self, modules: &Vec<ModuleConfig>, external_deps_id_and_pkg: Option<(ExternalDepsPackageId, ExternalDepsPackage)>, node_version: Option<NodeVersion>, ) -> anyhow::Result<SourcePackage> { // If there are any node actions, turn on the lambdas. if modules .iter() .any(|m| m.environment == ModuleEnvironment::Node) { self.runner().enable_actions()?; } tracing::info!( "Uploading package with {} modules to Storage", modules.len() ); // Canonicalize the modules let package: BTreeMap<_, _> = modules .iter() .map(|m| (m.path.clone().canonicalize(), m)) .collect(); anyhow::ensure!( modules.len() == package.len(), ErrorMetadata::bad_request( "CanonicalizationConflict", "Multiple modules canonicalize to the same name.", ) ); let (external_deps_package_id, external_deps_pkg) = match external_deps_id_and_pkg { Some((id, pkg)) => (Some(id), Some(pkg)), _ => (None, None), }; let (storage_key, sha256, package_size) = upload_package( package, self.application_storage.modules_storage.clone(), external_deps_pkg.map(|pkg| pkg.storage_key), ) .await?; tracing::info!("Upload of {storage_key:?} successful"); tracing::info!("Source package size: {}", package_size); log_source_package_size_bytes_total(package_size); Ok(SourcePackage { storage_key, sha256, external_deps_package_id, package_size, node_version, }) } // Clear all records for specified tables concurrently, potentially taking // multiple transactions for each. Returns the total number of documents // deleted. pub async fn clear_tables( &self, identity: &Identity, table_names: Vec<(ComponentPath, TableName)>, ) -> anyhow::Result<u64> { clear_tables(self, identity, table_names).await } pub async fn execute_standalone_module( &self, request_id: RequestId, module: ModuleConfig, args: Vec<JsonValue>, identity: Identity, caller: FunctionCaller, component: ComponentId, ) -> anyhow::Result<Result<FunctionReturn, FunctionError>> { let block_logging = self .log_visibility .should_redact_logs_and_error( &mut self.begin(identity.clone()).await?, identity.clone(), caller.allowed_visibility(), ) .await?; // Write (and commit) the module source to S3. // This will become a dangling reference since the _modules entry won't // be committed to the database, but we have to deal with those anyway. let source_package = self .upload_package(&vec![module.clone()], None, None) .await?; let mut tx = self.begin(identity.clone()).await?; let (user_environment_variables, system_env_var_overrides) = if component.is_root() { let user_environment_variables = EnvironmentVariablesModel::new(&mut tx).get_all().await?; ( user_environment_variables, system_env_var_overrides(&mut tx).await?, ) } else { (BTreeMap::new(), BTreeMap::new()) }; let mut udf_config_model = UdfConfigModel::new(&mut tx, component.into()); let udf_config = match udf_config_model.get().await? { Some(udf_config) => (**udf_config).clone(), None => { // If there hasn't been a push // yet, act like the most recent version. let udf_config = UdfConfig { server_version: Version::new(1000, 0, 0), import_phase_rng_seed: self.runtime.rng().random(), import_phase_unix_timestamp: self.runtime.unix_timestamp(), }; udf_config_model.set(udf_config.clone()).await?; udf_config }, }; // 1. analyze the module // We can analyze this module by itself, without combining it with the existing // modules since this module should be self-contained and not import // from other modules. let analyze_results = self .analyze( udf_config.clone(), vec![module.clone()], source_package.clone(), user_environment_variables, system_env_var_overrides, ) .await? .map_err(|js_error| { let metadata = ErrorMetadata::bad_request( "InvalidModules", format!("Could not analyze the given module:\n{js_error}"), ); anyhow::anyhow!(js_error).context(metadata) })?; let module_path = module.path.clone().canonicalize(); let analyzed_module = analyze_results .get(&module_path) .ok_or_else(|| anyhow::anyhow!("Unexpectedly missing analyze result"))? .clone(); // 2. get the function type let mut analyzed_function = None; for function in &analyzed_module.functions { if function.name.is_default_export() { analyzed_function = Some(function.clone()); } else { anyhow::bail!(ErrorMetadata::bad_request( "InvalidTestQuery", "Only `export default` is supported." )); } } let analyzed_function = analyzed_function.context(ErrorMetadata::bad_request( "InvalidTestQuery", "Default export is not a Convex function.", ))?; let source_package_id = SourcePackageModel::new(&mut tx, component.into()) .put(source_package) .await?; // 3. Add the module let path = CanonicalizedComponentModulePath { component, module_path: module_path.clone(), }; let module_id = ModuleModel::new(&mut tx) .get_metadata(path.clone()) .await? .map(|m| m.id()); ModuleModel::new(&mut tx) .put( module_id, path, module.source, source_package_id, module.source_map, Some(analyzed_module), ModuleEnvironment::Isolate, ) .await?; // 4. run the function within the transaction let function_name = FunctionName::default_export(); let component_path = BootstrapComponentsModel::new(&mut tx).must_component_path(component)?; let path = CanonicalizedComponentFunctionPath { component: component_path, udf_path: CanonicalizedUdfPath::new(module_path, function_name), }; let arguments = parse_udf_args(&path.udf_path, args)?; let (result, log_lines) = match analyzed_function.udf_type { UdfType::Query => { self.runner .run_query_without_caching(request_id.clone(), tx, path, arguments, caller) .await }, UdfType::Mutation => { anyhow::bail!(ErrorMetadata::bad_request( "UnsupportedTestQuery", "Mutations are not supported in the REPL yet." )) }, UdfType::Action => { anyhow::bail!(ErrorMetadata::bad_request( "UnsupportedTestQuery", "Actions are not supported in the REPL yet." )) }, UdfType::HttpAction => { anyhow::bail!(ErrorMetadata::bad_request( "UnsupportedTestQuery", "HTTP actions are not supported in the REPL yet." )) }, }?; let log_lines = RedactedLogLines::from_log_lines(log_lines, block_logging); Ok(match result { Ok(value) => Ok(FunctionReturn { value, log_lines }), Err(error) => Err(FunctionError { error: RedactedJsError::from_js_error(error, block_logging, request_id), log_lines, }), }) } #[fastrace::trace] pub async fn build_external_node_deps( &self, deps: Vec<NodeDependency>, ) -> anyhow::Result<(ExternalDepsPackageId, ExternalDepsPackage)> { // Check cache to see if we've built this package recently let mut tx = self.begin(Identity::system()).await?; let mut model = ExternalPackagesModel::new(&mut tx); let cached_match = model.get_cached_package_match(deps.clone()).await?; if let Some((cached_id, cached_pkg)) = cached_match { tracing::info!("Cache hit for external deps package!"); log_external_deps_package(true); return Ok((cached_id, cached_pkg)); } else { log_external_deps_package(false); tracing::info!("Cache miss for external deps package, running build_deps..."); } let result = self.runner().build_deps(deps).await?; let pkg = match result { Ok(pkg) => pkg, Err(js_error) => { let e = ErrorMetadata::bad_request( "InvalidExternalModules", format!( "Loading the pushed modules encountered the following error:\n{js_error}" ), ); return Err(anyhow::anyhow!(js_error).context(e)); }, }; // Write package to system table let id = self._upload_external_deps_package(pkg.clone()).await?; Ok((id, pkg)) } #[fastrace::trace] async fn _upload_external_deps_package( &self, external_deps_package: ExternalDepsPackage, ) -> anyhow::Result<ExternalDepsPackageId> { let mut tx = self.begin(Identity::system()).await?; let mut model = ExternalPackagesModel::new(&mut tx); let result = model.put(external_deps_package).await?; self.commit(tx, "upload_exteral_deps_package").await?; Ok(result) } /// Deletes the given user tables in one transaction. /// Returns the total number of documents in all tables deleted. pub async fn delete_tables( &self, identity: &Identity, table_names: Vec<TableName>, table_namespace: TableNamespace, ) -> anyhow::Result<u64> { let mut tx = self.begin(identity.clone()).await?; let mut count = 0; for table_name in table_names { anyhow::ensure!( !table_name.is_system(), "cannot delete system table {table_name}" ); let mut table_model = TableModel::new(&mut tx); count += table_model.must_count(table_namespace, &table_name).await?; table_model .delete_active_table(table_namespace, table_name) .await?; } self.commit(tx, "delete_tables").await?; Ok(count) } pub async fn delete_component( &self, identity: &Identity, component_id: ComponentId, ) -> anyhow::Result<()> { let mut tx = self.begin(identity.clone()).await?; ComponentConfigModel::new(&mut tx) .delete_component(component_id) .await?; self.commit(tx, "delete_component").await?; Ok(()) } /// Add system indexes if they do not already exist and update /// existing indexes if needed. pub async fn _add_system_indexes( &self, identity: &Identity, indexes: BTreeMap<IndexName, IndexedFields>, ) -> anyhow::Result<()> { let mut tx = self.begin(identity.clone()).await?; let namespace = TableNamespace::by_component_TODO(); for (index_name, index_fields) in indexes.into_iter() { let index_fields = self._validate_user_defined_index_fields(index_fields)?; let index_metadata = IndexMetadata::new_backfilling(*tx.begin_timestamp(), index_name, index_fields); let mut model = IndexModel::new(&mut tx); if let Some(existing_index_metadata) = model .pending_index_metadata(namespace, &index_metadata.name)? .or(model.enabled_index_metadata(namespace, &index_metadata.name)?) { if !index_metadata .config .same_spec(&existing_index_metadata.config) { IndexModel::new(&mut tx) .drop_index(existing_index_metadata.id()) .await?; IndexModel::new(&mut tx) .add_system_index(namespace, index_metadata) .await?; } } else { IndexModel::new(&mut tx) .add_system_index(namespace, index_metadata) .await?; } } self.commit(tx, "add_system_indexes").await?; Ok(()) } async fn bail_if_not_running(&self) -> anyhow::Result<()> { let backend_state = BackendStateModel::new(&mut self.begin(Identity::Unknown(None)).await?) .get_backend_state() .await?; if backend_state.is_stopped() { anyhow::bail!(ErrorMetadata::bad_request( "BackendIsNotRunning", "Cannot perform this operation when the backend is not running" )); } Ok(()) } pub async fn store_file( &self, component: ComponentId, content_length: Option<ContentLength>, content_type: Option<ContentType>, expected_sha256: Option<Sha256Digest>, body: BoxStream<'_, anyhow::Result<Bytes>>, ) -> anyhow::Result<DeveloperDocumentId> { self.bail_if_not_running().await?; let storage_id = self .file_storage .store_file( component.into(), content_length, content_type, body, expected_sha256, &self.usage_tracking, ) .await?; Ok(storage_id) } pub async fn store_file_entry( &self, component: ComponentId, entry: FileStorageEntry, ) -> anyhow::Result<DeveloperDocumentId> { let storage_id = self .file_storage .store_entry(component.into(), entry, &self.usage_tracking) .await?; Ok(storage_id) } pub async fn get_file_entry( &self, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<FileStorageEntry> { let mut file_storage_tx = self.begin(Identity::system()).await?; let Some(file_entry) = self .file_storage .transactional_file_storage // The transaction is not part of UDF so use the global usage counters. .get_file_entry(&mut file_storage_tx, component.into(), storage_id.clone()) .await? else { return Err(ErrorMetadata::not_found( "FileNotFound", format!("File {storage_id} not found"), ) .into()); }; Ok(file_entry) } pub async fn get_file( &self, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<FileStream> { self.bail_if_not_running().await?; let mut file_storage_tx = self.begin(Identity::system()).await?; let Some(file_entry) = self .file_storage .transactional_file_storage // The transaction is not part of UDF so use the global usage counters. .get_file_entry(&mut file_storage_tx, component.into(), storage_id.clone()) .await? else { return Err(ErrorMetadata::not_found( "FileNotFound", format!("File {storage_id} not found"), ) .into()); }; let Some(component_path) = file_storage_tx.get_component_path(component) else { return Err(ErrorMetadata::not_found( "FileNotFound", format!("Component {component:?} not found"), ) .into()); }; self .file_storage .transactional_file_storage // The transaction is not part of UDF so use the global usage counters. .get_file_stream(component_path, file_entry, self.usage_tracking.clone()) .await } pub async fn get_file_range( &self, component: ComponentId, storage_id: FileStorageId, bytes_range: (Bound<u64>, Bound<u64>), ) -> anyhow::Result<FileRangeStream> { self.bail_if_not_running().await?; let mut file_storage_tx = self.begin(Identity::system()).await?; let Some(file_entry) = self .file_storage .transactional_file_storage // The transaction is not part of UDF so use the global usage counters. .get_file_entry(&mut file_storage_tx, component.into(), storage_id.clone()) .await? else { return Err(ErrorMetadata::not_found( "FileNotFound", format!("File {storage_id} not found"), ) .into()); }; let Some(component_path) = file_storage_tx.get_component_path(component) else { return Err(ErrorMetadata::not_found( "FileNotFound", format!("Component {component:?} not found"), ) .into()); }; self .file_storage .transactional_file_storage // The transaction is not part of UDF so use the global usage counters. .get_file_range_stream( component_path, file_entry, bytes_range, self.usage_tracking.clone(), ) .await } pub async fn authenticate( &self, token: AuthenticationToken, system_time: SystemTime, ) -> anyhow::Result<Identity> { let identity = match token { AuthenticationToken::Admin(token, acting_as) => { let admin_identity = self .app_auth() .check_key(token.to_string(), self.instance_name()) .await?; match acting_as { Some(acting_user) => { // Act as the given user let Identity::InstanceAdmin(i) = admin_identity else { anyhow::bail!( "Admin identity returned from check_admin_key was not an admin." ); }; Identity::ActingUser(i, acting_user) }, None => admin_identity, } }, AuthenticationToken::User(id_token) => { let mut tx = self.begin(Identity::system()).await?; let auth_infos = AuthInfoModel::new(&mut tx).get().await?; let auth_info_values: Vec<_> = auth_infos .into_iter() .map(|auth_info| auth_info.into_value()) .collect(); let should_redact_errors = self .log_visibility .should_redact_logs_and_error( &mut tx, Identity::Unknown(None), AllowedVisibility::PublicOnly, ) .await?; let identity_result = validate_id_token( // This is any JWT. AuthIdToken(id_token), cached_http_client_for(ClientPurpose::ProviderMetadata), auth_info_values, system_time, should_redact_errors, ) .await; Identity::user(identity_result?) }, AuthenticationToken::None => Identity::Unknown(None), }; Ok(identity) } pub async fn validate_component_id( &self, identity: Identity, component_id: ComponentId, ) -> anyhow::Result<()> { let mut tx = self.begin(identity).await?; anyhow::ensure!( tx.get_component_path(component_id).is_some(), "Component {component_id:?} not found" ); Ok(()) } pub async fn udf_rate( &self, identity: Identity, identifier: UdfIdentifier, metric: UdfRate, window: MetricsWindow, ) -> anyhow::Result<Timeseries> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("udf_rate")); } self.function_log.udf_rate(identifier, metric, window) } pub async fn failure_percentage_top_k( &self, identity: Identity, window: MetricsWindow, k: usize, ) -> anyhow::Result<Vec<(String, Timeseries)>> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("failure_percentage_top_k")); } self.function_log.failure_percentage_top_k(window, k) } pub async fn cache_hit_percentage_top_k( &self, identity: Identity, window: MetricsWindow, k: usize, ) -> anyhow::Result<Vec<(String, Timeseries)>> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("failure_percentage_top_k")); } self.function_log.cache_hit_percentage_top_k(window, k) } pub async fn cache_hit_percentage( &self, identity: Identity, identifier: UdfIdentifier, window: MetricsWindow, ) -> anyhow::Result<Timeseries> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("cache_hit_percentage")); } self.function_log.cache_hit_percentage(identifier, window) } pub async fn latency_percentiles( &self, identity: Identity, identifier: UdfIdentifier, percentiles: Vec<Percentile>, window: MetricsWindow, ) -> anyhow::Result<BTreeMap<Percentile, Timeseries>> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("latency_percentiles_ms")); } self.function_log .latency_percentiles(identifier, percentiles, window) } pub async fn udf_summary( &self, identity: Identity, cursor: Option<CursorMs>, ) -> anyhow::Result<(Option<UdfMetricSummary>, Option<CursorMs>)> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("latency_percentiles_ms")); } Ok(self.function_log.udf_summary(cursor)) } pub async fn table_rate( &self, identity: Identity, name: TableName, metric: TableRate, window: MetricsWindow, ) -> anyhow::Result<Timeseries> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("table_rate")); } self.function_log.table_rate(name, metric, window) } pub async fn stream_udf_execution( &self, identity: Identity, cursor: CursorMs, ) -> anyhow::Result<(Vec<FunctionExecution>, CursorMs)> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("stream_udf_execution")); } Ok(self.function_log.stream(cursor).await) } pub async fn stream_function_logs( &self, identity: Identity, cursor: CursorMs, ) -> anyhow::Result<(Vec<FunctionExecutionPart>, CursorMs)> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("stream_function_logs")); } Ok(self.function_log.stream_parts(cursor).await) } pub async fn scheduled_job_lag( &self, identity: Identity, window: MetricsWindow, ) -> anyhow::Result<Timeseries> { if !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("scheduled_job_lag")); } self.function_log.scheduled_job_lag(window) } pub async fn cancel_all_jobs( &self, component_id: ComponentId, path: Option<CanonicalizedComponentFunctionPath>, identity: Identity, start_next_ts: Option<Timestamp>, end_next_ts: Option<Timestamp>, ) -> anyhow::Result<()> { loop { let count = self .execute_with_audit_log_events_and_occ_retries( identity.clone(), "application_cancel_all_jobs", |tx| { Self::_cancel_all_jobs( tx, component_id, path.clone(), *MAX_JOBS_CANCEL_BATCH, start_next_ts, end_next_ts, ) .into() }, ) .await?; if count < *MAX_JOBS_CANCEL_BATCH { break; } } Ok(()) } async fn _cancel_all_jobs( tx: &mut Transaction<RT>, component_id: ComponentId, path: Option<CanonicalizedComponentFunctionPath>, max_jobs: usize, start_next_ts: Option<Timestamp>, end_next_ts: Option<Timestamp>, ) -> anyhow::Result<(usize, Vec<DeploymentAuditLogEvent>)> { let count = SchedulerModel::new(tx, component_id.into()) .cancel_all(path, max_jobs, start_next_ts, end_next_ts) .await?; Ok((count, vec![])) } /// Commit a transaction and send audit log events to the log manager if the /// transaction commits successfully. pub async fn commit_with_audit_log_events( &self, mut transaction: Transaction<RT>, events: Vec<DeploymentAuditLogEvent>, write_source: impl Into<WriteSource>, ) -> anyhow::Result<Timestamp> { DeploymentAuditLogModel::new(&mut transaction) .insert(events.clone()) .await?; let ts = self.commit(transaction, write_source).await?; let logs = events .into_iter() .map(|event| { DeploymentAuditLogEvent::to_log_event(event, UnixTimestamp::from_nanos(ts.into())) }) .try_collect()?; self.log_manager_client.send_logs(logs); Ok(ts) } // TODO CX-5139 Remove this when audit logs are being processed in LogManager. async fn insert_deployment_audit_log_events<'b, F, T>( tx: &mut Transaction<RT>, f: F, ) -> anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)> where T: Send, F: Send + Sync, F: for<'c> Fn( &'c mut Transaction<RT>, ) -> ShortBoxFuture<'c, 'b, anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>>, { let (t, events) = f(tx).0.await?; DeploymentAuditLogModel::new(tx) .insert(events.clone()) .await?; Ok((t, events)) } pub async fn execute_with_audit_log_events_and_occ_retries<'a, F, T>( &self, identity: Identity, write_source: impl Into<WriteSource>, f: F, ) -> anyhow::Result<T> where F: Send + Sync, T: Send + 'static, F: for<'b> Fn( &'b mut Transaction<RT>, ) -> ShortBoxFuture<'b, 'a, anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>>, { self.execute_with_audit_log_events_and_occ_retries_with_pause_client( identity, write_source, f, ) .await .map(|(t, _)| t) } pub async fn execute_with_audit_log_events_and_occ_retries_reporting_stats<'a, F, T>( &self, identity: Identity, write_source: impl Into<WriteSource>, f: F, ) -> anyhow::Result<(T, OccRetryStats)> where F: Send + Sync, T: Send + 'static, F: for<'b> Fn( &'b mut Transaction<RT>, ) -> ShortBoxFuture<'b, 'a, anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>>, { self.execute_with_audit_log_events_and_occ_retries_with_pause_client( identity, write_source, f, ) .await } pub async fn execute_with_audit_log_events_and_occ_retries_with_pause_client<'a, F, T>( &self, identity: Identity, write_source: impl Into<WriteSource>, f: F, ) -> anyhow::Result<(T, OccRetryStats)> where F: Send + Sync, T: Send + 'static, F: for<'b> Fn( &'b mut Transaction<RT>, ) -> ShortBoxFuture<'b, 'a, anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>>, { let db = self.database.clone(); let (ts, (t, events), stats) = db .execute_with_occ_retries(identity, FunctionUsageTracker::new(), write_source, |tx| { Self::insert_deployment_audit_log_events(tx, &f).into() }) .await?; // Send deployment audit logs // TODO CX-5139 Remove this when audit logs are being processed in LogManager. let logs = events .into_iter() .map(|event| { DeploymentAuditLogEvent::to_log_event(event, UnixTimestamp::from_nanos(ts.into())) }) .try_collect()?; self.log_manager_client.send_logs(logs); Ok((t, stats)) } pub async fn execute_with_occ_retries<'a, T, F>( &'a self, identity: Identity, usage: FunctionUsageTracker, write_source: impl Into<WriteSource>, f: F, ) -> anyhow::Result<(Timestamp, T)> where F: Send + Sync, T: Send + 'static, F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'b, 'a, anyhow::Result<T>>, { self.database .execute_with_occ_retries(identity, usage, write_source, f) .await .map(|(ts, t, _)| (ts, t)) } pub async fn lookup_function_handle( &self, identity: Identity, handle: FunctionHandle, ) -> anyhow::Result<CanonicalizedComponentFunctionPath> { let mut tx = self.begin(identity).await?; FunctionHandlesModel::new(&mut tx).lookup(handle).await } pub async fn canonicalized_function_path( &self, identity: Identity, component_id: ComponentId, path: Option<String>, reference: Option<String>, function_handle: Option<String>, ) -> anyhow::Result<CanonicalizedComponentFunctionPath> { if let Some(function_handle) = function_handle { let handle = function_handle.parse()?; return self.lookup_function_handle(identity, handle).await; } let reference = match (path, reference) { (None, None) => anyhow::bail!(ErrorMetadata::bad_request( "MissingUdfPathOrFunctionReference", "Missing UDF path or function reference. One must be provided." )), (Some(path), None) => Reference::Function(path.parse()?), (None, Some(reference)) => reference.parse()?, (Some(_), Some(_)) => anyhow::bail!(ErrorMetadata::bad_request( "InvalidUdfPathOrFunctionReference", "Both UDF path and function reference provided. Only one must be provided." )), }; // Reading from a separate transaction here is safe because the component id to // component path mapping is stable. let mut tx = self.begin(identity).await?; let mut components_model = ComponentsModel::new(&mut tx); let resource = components_model .resolve(component_id, None, &reference) .await?; let path = match resource { Resource::Function(path) => path, Resource::Value(_) | Resource::ResolvedSystemUdf(_) => { anyhow::bail!("Resource type not supported for internal query") }, }; Ok(path) } pub fn files_storage(&self) -> Arc<dyn Storage> { self.application_storage.files_storage.clone() } /// Add hidden primary key indexes for the given tables. Developers do not /// have access to these indexes. pub async fn add_primary_key_indexes( &self, identity: &Identity, indexes: BTreeMap<TableName, PrimaryKey>, ) -> anyhow::Result<()> { let indexes: BTreeMap<IndexName, IndexedFields> = indexes .into_iter() .map(|(table_name, primary_key)| { let index_name = IndexName::new_reserved( table_name, AIRBYTE_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(), )?; let index_fields = primary_key.into_indexed_fields(); Ok((index_name, index_fields)) }) .collect::<anyhow::Result<_>>()?; self._add_system_indexes(identity, indexes).await?; Ok(()) } pub async fn wait_for_primary_key_indexes_ready( &self, identity: Identity, indexes: BTreeSet<TableName>, ) -> anyhow::Result<()> { loop { let mut tx = self.begin(identity.clone()).await?; if AirbyteImportModel::new(&mut tx) .primary_key_indexes_ready(&indexes) .await? { return Ok(()); } let token = tx.into_token()?; let subscription = self.database.subscribe(token).await?; subscription.wait_for_invalidation().await; } } /// Return if the primary key indexes for the given tables have finished /// backfilling. pub async fn primary_key_indexes_ready( &self, identity: Identity, indexes: BTreeSet<TableName>, ) -> anyhow::Result<bool> { let mut tx = self.begin(identity).await?; AirbyteImportModel::new(&mut tx) .primary_key_indexes_ready(&indexes) .await } /// Inserts, replaces, or deletes the Airbyte record, depending on the sync /// mode. async fn process_record( &self, tx: &mut Transaction<RT>, record: AirbyteRecord, stream: &ValidatedAirbyteStream, ) -> anyhow::Result<()> { let table_name = record.table_name().clone(); let namespace = TableNamespace::by_component_TODO(); let deleted = record.deleted(); let object = record.into_object(); match stream { ValidatedAirbyteStream::Append => { UserFacingModel::new(tx, namespace) .insert(table_name.clone(), object.clone()) .await?; Ok::<(), anyhow::Error>(()) }, ValidatedAirbyteStream::Dedup(primary_key) => { // Get the current value of the record based on the primary key let mut range = Vec::new(); for field_path in primary_key.clone().into_indexed_fields().iter() { let value = object .get_path(field_path) .context(ErrorMetadata::bad_request( "MissingPrimaryKeyValue", format!("Missing value for primary key field: {field_path:?}."), ))?; let range_expression = IndexRangeExpression::Eq(field_path.clone(), value.clone().into()); range.push(range_expression) } let index_range = IndexRange { index_name: IndexName::new_reserved( table_name.clone(), AIRBYTE_PRIMARY_KEY_INDEX_DESCRIPTOR.clone(), )?, range, order: Order::Asc, }; let query = common::query::Query::index_range(index_range); let mut query_stream = ResolvedQuery::new(tx, namespace, query)?; // Replace or delete the record or insert if there is no existing record that // matches this value for the primary key. if let Some(doc) = query_stream.expect_at_most_one(tx).await? { let doc_id = DeveloperDocumentId::from(doc.id()); if deleted { UserFacingModel::new(tx, namespace).delete(doc_id).await?; } else { UserFacingModel::new(tx, namespace) .replace(doc_id, object.clone()) .await?; } } else { UserFacingModel::new(tx, namespace) .insert(table_name.clone(), object.clone()) .await?; } Ok(()) }, } } /// Insert airbyte record messages into the table for the stream. Returns /// the number of documents inserted. pub async fn import_airbyte_records( &self, identity: &Identity, records: Vec<AirbyteRecord>, tables: BTreeMap<TableName, ValidatedAirbyteStream>, ) -> anyhow::Result<u64> { let mut count = 0; let mut tx = self.begin(identity.clone()).await?; for record in records { let table_name = record.table_name(); let stream = tables.get(table_name).context(ErrorMetadata::bad_request( "MissingStream", format!("Missing stream for table {table_name}"), ))?; let insert_fut = self.process_record(&mut tx, record.clone(), stream); match insert_fut.await { Ok(()) => {}, Err(e) if e.is_pagination_limit() => { self.commit(tx, "airbyte_write_page").await?; tx = self.begin(identity.clone()).await?; self.process_record(&mut tx, record, stream).await?; }, Err(e) => anyhow::bail!(e), } count += 1; } self.commit(tx, "app_private_import_airbyte").await?; Ok(count) } pub async fn apply_fivetran_operations( &self, identity: &Identity, rows: Vec<BatchWriteRow>, ) -> anyhow::Result<()> { let mut tx = self.begin(identity.clone()).await?; let mut model = FivetranImportModel::new(&mut tx); for row in rows { match model.apply_operation(row.clone()).await { Ok(()) => {}, Err(e) if e.is_pagination_limit() => { self.commit(tx, "fivetran_write_page").await?; tx = self.begin(identity.clone()).await?; model = FivetranImportModel::new(&mut tx); model.apply_operation(row).await?; }, Err(e) => anyhow::bail!(e), } } self.commit(tx, "app_fivetran_import").await?; Ok(()) } pub async fn fivetran_truncate( &self, identity: &Identity, table_name: TableName, delete_before: Option<DateTime<Utc>>, delete_type: DeleteType, ) -> anyhow::Result<()> { let mut done = false; while !done { let mut tx = self.begin(identity.clone()).await?; if !TableModel::new(&mut tx).table_exists(TableNamespace::Global, &table_name) { // Simply accept the truncate if the table exists return Ok(()); } let mut query: ResolvedQuery<_> = FivetranImportModel::new(&mut tx) .synced_query(&table_name, &delete_before) .await?; loop { let res: anyhow::Result<()> = try { match query.next(&mut tx, None).await? { Some(doc) => { FivetranImportModel::new(&mut tx) .truncate_document(doc, delete_type) .await?; }, None => { done = true; break; }, } }; if let Err(e) = res { if e.is_pagination_limit() { // Need a new transaction: commit what we already have and continue break; } else { anyhow::bail!(e) } } } self.commit(tx, "app_fivetran_truncate").await?; } Ok(()) } pub async fn get_schema( &self, namespace: TableNamespace, identity: &Identity, ) -> anyhow::Result<Option<Arc<DatabaseSchema>>> { let mut tx = self.begin(identity.clone()).await?; let mut model = SchemaModel::new(&mut tx, namespace); Ok(model .get_by_state(SchemaState::Active) .await? .map(|(_id, schema)| schema)) } pub async fn fivetran_create_table( &self, identity: &Identity, table_definition: TableDefinition, ) -> anyhow::Result<()> { let table_name = table_definition.table_name; // Add the indexes to the table. let indexes: BTreeMap<IndexName, IndexedFields> = table_definition .indexes .into_iter() .map(|(descriptor, fields)| { let index_name = IndexName::new_reserved(table_name.clone(), descriptor)?; let index_fields = fields.fields; Ok((index_name, index_fields)) }) .collect::<anyhow::Result<_>>()?; self._add_system_indexes(identity, indexes).await?; // Wait for the indexes to be ready. loop { let mut tx = self.begin(identity.clone()).await?; if IndexModel::new(&mut tx) .indexes_ready( &FIVETRAN_PRIMARY_KEY_INDEX_DESCRIPTOR, &btreeset! { table_name.clone() }, ) .await? { return Ok(()); } let token = tx.into_token()?; let subscription = self.database.subscribe(token).await?; subscription.wait_for_invalidation().await; } } pub async fn shutdown(&self) -> anyhow::Result<()> { self.log_manager_client.shutdown().await?; self.table_summary_worker.shutdown().await?; self.system_table_cleanup_worker.lock().shutdown(); self.schema_worker.lock().shutdown(); let index_worker = self.index_worker.lock().take(); if let Some(index_worker) = index_worker { shutdown_and_join(index_worker).await?; } self.search_worker.lock().shutdown(); self.search_and_vector_bootstrap_worker.lock().shutdown(); self.fast_forward_worker.lock().shutdown(); let export_worker = self.export_worker.lock().take(); if let Some(export_worker) = export_worker { shutdown_and_join(export_worker).await?; } let snapshot_import_worker = self.snapshot_import_worker.lock().take(); if let Some(snapshot_import_worker) = snapshot_import_worker { shutdown_and_join(snapshot_import_worker).await?; } self.runner.shutdown().await?; self.scheduled_job_runner.shutdown(); self.cron_job_executor.lock().shutdown(); self.database.shutdown().await?; let migration_worker = self.migration_worker.lock().take(); if let Some(migration_worker) = migration_worker { shutdown_and_join(migration_worker).await?; } tracing::info!("Application shut down"); Ok(()) } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server