Skip to main content
Glama

Convex MCP server

Official
by get-convex
test_helpers.rs51.1 kB
use std::{ any::Any, collections::BTreeMap, fs::File, io::Read, sync::{ Arc, Exclusive, LazyLock, }, time::Duration, }; use anyhow::Context; use async_trait::async_trait; use common::{ bootstrap_model::{ components::handles::FunctionHandle, index::{ database_index::IndexedFields, IndexMetadata, }, }, components::{ CanonicalizedComponentFunctionPath, ComponentFunctionPath, ComponentId, ComponentPath, PublicFunctionPath, }, errors::JsError, execution_context::ExecutionContext, fastrace_helpers::EncodedSpan, http::{ fetch::ProxiedFetchClient, RoutedHttpPath, }, log_lines::LogLines, pause::HoldGuard, persistence::Persistence, query_journal::QueryJournal, runtime::{ testing::TestRuntime, Runtime, UnixTimestamp, }, testing::TestPersistence, types::{ AllowedVisibility, ModuleEnvironment, UdfType, }, value::ConvexValue, version::Version, }; use database::{ test_helpers::{ DbFixtures, DbFixturesArgs, }, text_index_worker::flusher::backfill_text_indexes, vector_index_worker::flusher::backfill_vector_indexes, Database, FollowerRetentionManager, IndexModel, IndexWorker, Token, Transaction, }; use file_storage::TransactionalFileStorage; use futures::{ select, Future, FutureExt, }; use keybroker::{ Identity, InstanceSecret, KeyBroker, DEV_INSTANCE_NAME, DEV_SECRET, }; use maplit::btreemap; use model::{ components::handles::FunctionHandlesModel, config::{ module_loader::{ test_module_loader::UncachedModuleLoader, ModuleLoader, }, types::{ ConfigMetadata, ModuleConfig, }, ConfigModel, }, file_storage::{ types::FileStorageEntry, FileStorageId, }, modules::module_versions::ModuleSource, scheduled_jobs::VirtualSchedulerModel, source_packages::{ types::SourcePackage, upload_download::upload_package, }, test_helpers::DbFixturesWithModel, udf_config::{ types::UdfConfig, UdfConfigModel, }, virtual_system_mapping, }; use rand::Rng; use search::searcher::InProcessSearcher; use serde::Deserialize; use serde_json::Value as JsonValue; use storage::{ LocalDirStorage, Storage, }; use sync_types::{ types::SerializedArgs, UdfPath, }; use tokio::sync::{ mpsc, oneshot, }; use udf::{ environment::{ CONVEX_ORIGIN, CONVEX_SITE, }, helpers::parse_udf_args, validation::{ validate_schedule_args, ValidatedHttpPath, ValidatedPathAndArgs, }, ActionOutcome, FunctionOutcome, FunctionResult, HttpActionRequest, HttpActionResponse, HttpActionResponsePart, HttpActionResponseStreamer, HttpActionResult, UdfOutcome, }; use usage_tracking::FunctionUsageStats; use value::{ id_v6::DeveloperDocumentId, ConvexArray, ConvexObject, TableName, TableNamespace, }; use vector::{ PublicVectorSearchQueryResult, VectorSearch, }; use crate::{ bundled_js::UDF_TEST_BUNDLE_PATH, client::{ initialize_v8, EnvironmentData, IsolateWorker, Request, RequestType, SharedIsolateHeapStats, UdfCallback, UdfRequest, }, concurrency_limiter::ConcurrencyLimiter, isolate2::runner::{ run_isolate_v2_udf, SeedData, }, metrics::queue_timer, ActionCallbacks, IsolateClient, IsolateConfig, }; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct Bundle { path: String, source: String, source_map: Option<String>, environment: ModuleEnvironment, } #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct PushOutput { modules: Vec<Bundle>, } fn load_test_source() -> anyhow::Result<Vec<ModuleConfig>> { let mut file = File::open(UDF_TEST_BUNDLE_PATH)?; let mut contents = vec![]; file.read_to_end(&mut contents)?; let output: PushOutput = serde_json::from_slice(&contents)?; let mut modules = BTreeMap::new(); for module in output.modules { let config = ModuleConfig { path: module.path.parse()?, source: ModuleSource::new(&module.source), source_map: module.source_map, environment: module.environment, }; assert!(modules.insert(module.path, config).is_none()); } Ok(modules.into_values().collect()) } pub static TEST_SOURCE: LazyLock<Vec<ModuleConfig>> = LazyLock::new(|| load_test_source().unwrap()); pub static TEST_SOURCE_ISOLATE_ONLY: LazyLock<Vec<ModuleConfig>> = LazyLock::new(|| { TEST_SOURCE .clone() .into_iter() .filter(|m| m.environment == ModuleEnvironment::Isolate) .collect() }); pub fn test_environment_data<RT: Runtime>(rt: RT) -> anyhow::Result<EnvironmentData<RT>> { let key_broker = KeyBroker::new(DEV_INSTANCE_NAME, InstanceSecret::try_from(DEV_SECRET)?)?; let modules_storage = Arc::new(LocalDirStorage::new(rt.clone())?); let module_loader = Arc::new(UncachedModuleLoader { modules_storage }); let storage = Arc::new(LocalDirStorage::new(rt.clone())?); let convex_origin = "http://127.0.0.1:8000".into(); let file_storage = TransactionalFileStorage::new(rt, storage, convex_origin); let default_system_env_vars = btreemap! { CONVEX_ORIGIN.clone() => "https://carnitas.convex.cloud".parse()?, CONVEX_SITE.clone() => "https://carnitas.convex.site".parse()? }; Ok(EnvironmentData { key_broker, default_system_env_vars, file_storage, module_loader, }) } /// Indicates that a V8 op panicked and we should panic the test #[derive(thiserror::Error)] #[error("panicked")] // Wrap the error in `Exclusive` to make this type `Sync` so that it can be // wrapped by anyhow pub struct PanicError(Exclusive<Box<dyn Any + Send>>); impl PanicError { pub fn new(panic: Box<dyn Any + Send>) -> Self { Self(Exclusive::new(panic)) } pub fn into_inner(self) -> Box<dyn Any + Send> { self.0.into_inner() } } impl std::fmt::Debug for PanicError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("PanicError").finish() } } pub struct UdfTest<RT: Runtime, P: Persistence> { pub database: Database<RT>, pub isolate: IsolateClient<RT>, pub persistence: Arc<P>, pub rt: RT, pub key_broker: KeyBroker, pub module_loader: Arc<dyn ModuleLoader<RT>>, search_storage: Arc<dyn Storage>, file_storage: TransactionalFileStorage<RT>, environment_data: EnvironmentData<RT>, isolate_v2_enabled: bool, } impl<RT: Runtime, P: Persistence> Clone for UdfTest<RT, P> { fn clone(&self) -> Self { Self { database: self.database.clone(), isolate: self.isolate.clone(), persistence: self.persistence.clone(), rt: self.rt.clone(), key_broker: self.key_broker.clone(), module_loader: self.module_loader.clone(), search_storage: self.search_storage.clone(), file_storage: self.file_storage.clone(), environment_data: self.environment_data.clone(), isolate_v2_enabled: self.isolate_v2_enabled, } } } impl<RT: Runtime, P: Persistence> UdfTest<RT, P> { async fn new( modules: Vec<ModuleConfig>, rt: RT, persistence: Arc<P>, config: UdfTestConfig, max_isolate_workers: usize, ) -> anyhow::Result<Result<Self, JsError>> { let DbFixtures { db: database, search_storage, .. } = DbFixtures::new_with_model_and_args( &rt, DbFixturesArgs { tp: Some(persistence.clone()), searcher: Some(Arc::new(InProcessSearcher::new(rt.clone()).await?)), virtual_system_mapping: virtual_system_mapping().clone(), ..Default::default() }, ) .await?; let handle = database.start_search_and_vector_bootstrap(); handle.join().await?; let key_broker = KeyBroker::new(DEV_INSTANCE_NAME, InstanceSecret::try_from(DEV_SECRET)?)?; let modules_storage = Arc::new(LocalDirStorage::new(rt.clone())?); let module_loader = Arc::new(UncachedModuleLoader { modules_storage: modules_storage.clone(), }); let storage = Arc::new(LocalDirStorage::new(rt.clone())?); let convex_origin = "http://127.0.0.1:8000".into(); let file_storage = TransactionalFileStorage::new(rt.clone(), storage.clone(), convex_origin); let environment_data = EnvironmentData { key_broker: key_broker.clone(), default_system_env_vars: btreemap! { CONVEX_ORIGIN.clone() => "https://carnitas.convex.cloud".parse()?, CONVEX_SITE.clone() => "https://carnitas.convex.site".parse()? }, file_storage: file_storage.clone(), module_loader: module_loader.clone(), }; let isolate = IsolateClient::new( rt.clone(), 100, max_isolate_workers, Some(config.isolate_config), )?; anyhow::ensure!( modules .iter() .all(|m| m.environment == ModuleEnvironment::Isolate), "Only isolate modules allowed in isolate tests. Please use application level tests to \ test Node modules" ); let udf_config = UdfConfig::new_for_test(&rt, config.udf_server_version); let modules_by_path: BTreeMap<_, _> = modules .iter() .map(|c| (c.path.clone().canonicalize(), c.clone())) .collect(); let analyze_results = match isolate .analyze( udf_config.clone(), modules_by_path.clone(), BTreeMap::new(), DEV_INSTANCE_NAME.to_string(), ) .await? { Ok(analyze_results) => analyze_results, Err(err) => return Ok(Err(err)), }; let (storage_key, sha256, package_size) = upload_package( modules_by_path .iter() .map(|(path, m)| (path.clone(), m)) .collect(), modules_storage, None, ) .await?; let mut tx = database.begin(Identity::system()).await?; ConfigModel::new(&mut tx, ComponentId::test_user()) .apply( ConfigMetadata::new(), modules, udf_config, Some(SourcePackage { storage_key, sha256, package_size, external_deps_package_id: None, node_version: None, }), analyze_results, None, ) .await?; database.commit(tx).await?; Ok(Ok(Self { database, isolate, persistence, rt, key_broker, search_storage, module_loader, file_storage, environment_data, isolate_v2_enabled: false, })) } pub async fn with_persistence( rt: RT, persistence: Arc<P>, config: UdfTestConfig, max_isolate_workers: usize, ) -> anyhow::Result<Self> { let result = Self::new( TEST_SOURCE_ISOLATE_ONLY.clone(), rt, persistence, config, max_isolate_workers, ) .await? .expect("Unexpected JSError"); Ok(result) } pub fn enable_isolate_v2(&mut self) { self.isolate_v2_enabled = true; } pub async fn create_index(&self, name: &str, field: &str) -> anyhow::Result<()> { let mut tx = self.database.begin(Identity::system()).await?; let index_name = name.parse()?; let index = IndexMetadata::new_backfilling( *tx.begin_timestamp(), index_name, IndexedFields::try_from(vec![field.parse()?])?, ); IndexModel::new(&mut tx) .add_application_index(TableNamespace::test_user(), index) .await?; self.database.commit(tx).await?; let retention_manager = FollowerRetentionManager::new(self.rt.clone(), self.persistence.reader()).await?; IndexWorker::new_terminating( self.rt.clone(), self.persistence.clone(), Arc::new(retention_manager), self.database.clone(), ) .await } pub async fn mutation( &self, udf_path: &str, args: ConvexObject, ) -> anyhow::Result<ConvexValue> { self.mutation_with_identity(udf_path, args, Identity::system()) .await } pub async fn mutation_js_error( &self, udf_path: &str, args: ConvexObject, ) -> anyhow::Result<JsError> { let outcome: UdfOutcome = self .raw_mutation( udf_path, vec![ConvexValue::Object(args)], Identity::system(), ) .await?; Ok(outcome.result.unwrap_err()) } pub async fn mutation_log_lines( &self, udf_path: &str, args: ConvexObject, ) -> anyhow::Result<LogLines> { let (_, outcome) = self .mutation_outcome(udf_path, args, Identity::system()) .await?; Ok(outcome.log_lines) } pub async fn mutation_with_identity( &self, udf_path: &str, args: ConvexObject, identity: Identity, ) -> anyhow::Result<ConvexValue> { let (v, _) = self.mutation_outcome(udf_path, args, identity).await?; Ok(v) } pub async fn mutation_outcome( &self, udf_path: &str, args: ConvexObject, identity: Identity, ) -> anyhow::Result<(ConvexValue, UdfOutcome)> { let outcome = self .raw_mutation(udf_path, vec![ConvexValue::Object(args)], identity) .await?; let value = outcome .result .as_ref() .map(|v| v.unpack()) .map_err(|e| { anyhow::anyhow!( "mutation failed with user error. If that is intended, call mutation_js_error \ or raw_mutation instead. {e:?}" ) }) .unwrap(); Ok((value, outcome)) } pub async fn raw_mutation( &self, udf_path: &str, args: Vec<ConvexValue>, identity: Identity, ) -> anyhow::Result<UdfOutcome> { // Bump time before running a mutation so we have a higher creation time than // previous mutations. self.rt.wait(Duration::from_micros(10)).await; let mut tx = self.database.begin(identity.clone()).await?; let path = ComponentFunctionPath { component: ComponentPath::test_user(), udf_path: udf_path.parse()?, }; let canonicalized_path = path.canonicalize(); let args_array = ConvexArray::try_from(args)?; let validated_path_or_err = ValidatedPathAndArgs::new( AllowedVisibility::PublicOnly, &mut tx, PublicFunctionPath::Component(canonicalized_path.clone()), args_array.clone(), UdfType::Mutation, ) .await?; let path_and_args = match validated_path_or_err { Err(js_error) => { return UdfOutcome::from_error( js_error, canonicalized_path, args_array, identity.into(), self.rt.clone(), None, ); }, Ok(path_and_args) => path_and_args, }; if self.isolate_v2_enabled { let rng_seed = self.rt.rng().random(); let (tx, outcome) = run_isolate_v2_udf( self.rt.clone(), tx, self.module_loader.clone(), SeedData { rng_seed, unix_timestamp: self.rt.unix_timestamp(), }, UdfType::Mutation, path_and_args, self.key_broker.clone(), ExecutionContext::new_for_test(), QueryJournal::new(), ) .await?; let path: UdfPath = udf_path.parse()?; let canonicalized_path = path.canonicalize(); self.database .commit_with_write_source(tx, Some(canonicalized_path.into())) .await?; Ok(outcome) } else { let (tx, outcome) = self .isolate .execute_udf( UdfType::Mutation, path_and_args, tx, QueryJournal::new(), ExecutionContext::new_for_test(), self.environment_data.clone(), 0, DEV_INSTANCE_NAME.to_string(), None, ) .await?; let FunctionOutcome::Mutation(outcome) = outcome else { anyhow::bail!("Called raw_mutation on a non-mutation"); }; self.database .commit_with_write_source(tx, Some(canonicalized_path.udf_path.into())) .await?; Ok(outcome) } } pub async fn query(&self, udf_path: &str, args: ConvexObject) -> anyhow::Result<ConvexValue> { self.query_with_identity(udf_path, args, Identity::system()) .await } pub async fn query_js_error( &self, udf_path: &str, args: ConvexObject, ) -> anyhow::Result<JsError> { let (outcome, _) = self .raw_query( udf_path, vec![ConvexValue::Object(args)], Identity::system(), None, ) .await?; Ok(outcome.result.unwrap_err()) } pub async fn query_log_lines( &self, udf_path: &str, args: ConvexObject, ) -> anyhow::Result<LogLines> { let (_, outcome) = self .query_outcome(udf_path, args, Identity::system()) .await?; Ok(outcome.log_lines) } pub async fn query_with_identity( &self, udf_path: &str, args: ConvexObject, identity: Identity, ) -> anyhow::Result<ConvexValue> { let (v, _) = self.query_outcome(udf_path, args, identity).await?; Ok(v) } /// Execute the query and also return the corresponding UdfOutcome struct. pub async fn query_outcome( &self, udf_path: &str, args: ConvexObject, identity: Identity, ) -> anyhow::Result<(ConvexValue, UdfOutcome)> { let (outcome, _) = self .raw_query(udf_path, vec![ConvexValue::Object(args)], identity, None) .await?; let value = outcome .result .as_ref() .map(|v| v.unpack()) .map_err(|e| { anyhow::anyhow!( "query failed with user error. If that is intended, call query_js_error or \ raw_query instead. {e:?}" ) }) .unwrap(); Ok((value, outcome)) } pub async fn raw_query( &self, udf_path: &str, args: Vec<ConvexValue>, identity: Identity, journal: Option<QueryJournal>, ) -> anyhow::Result<(UdfOutcome, Token)> { let mut tx = self.database.begin(identity.clone()).await?; let path = ComponentFunctionPath { component: ComponentPath::test_user(), udf_path: udf_path.parse()?, }; let canonicalized_path = path.canonicalize(); let args_array = ConvexArray::try_from(args)?; let validated_path_or_err = ValidatedPathAndArgs::new( AllowedVisibility::PublicOnly, &mut tx, PublicFunctionPath::Component(canonicalized_path.clone()), args_array.clone(), UdfType::Query, ) .await?; let path_and_args = match validated_path_or_err { Err(js_error) => { let outcome = UdfOutcome::from_error( js_error, canonicalized_path, args_array, identity.into(), self.rt.clone(), None, )?; return Ok((outcome, tx.into_token()?)); }, Ok(path_and_args) => path_and_args, }; if self.isolate_v2_enabled { let rng_seed = self.rt.rng().random(); let (tx, outcome) = run_isolate_v2_udf( self.rt.clone(), tx, self.module_loader.clone(), SeedData { rng_seed, unix_timestamp: self.rt.unix_timestamp(), }, UdfType::Query, path_and_args, self.key_broker.clone(), ExecutionContext::new_for_test(), journal.unwrap_or_else(QueryJournal::new), ) .await?; // Ensure the transaction is readonly by turning it into a subscription token. let token = tx.into_token()?; Ok((outcome, token)) } else { let (tx, outcome) = self .isolate .execute_udf( UdfType::Query, path_and_args, tx, journal.unwrap_or_else(QueryJournal::new), ExecutionContext::new_for_test(), self.environment_data.clone(), 0, DEV_INSTANCE_NAME.to_string(), None, ) .await?; // Ensure the transaction is readonly by turning it into a subscription token. let token = tx.into_token()?; let FunctionOutcome::Query(query_outcome) = outcome else { anyhow::bail!("Called raw_query on a non-query"); }; Ok((query_outcome, token)) } } /// Run a query, bypassing the validation done in `ValidatedUdfPathAndArgs`, /// and retrieve the JS error it produces. /// /// This can be useful for testing errors from lower layers. pub async fn query_js_error_no_validation( &self, udf_path: &str, args: ConvexObject, ) -> anyhow::Result<JsError> { let mut tx = self.database.begin(Identity::system()).await?; let udf_config = UdfConfigModel::new(&mut tx, TableNamespace::test_user()) .get() .await?; let npm_version = udf_config .context("Missing udf_config")? .server_version .clone(); let path: UdfPath = udf_path.parse()?; let path_and_args = ValidatedPathAndArgs::new_for_tests( path.canonicalize(), ConvexArray::try_from(vec![ConvexValue::Object(args)])?, Some(npm_version), ); if self.isolate_v2_enabled { let rng_seed = self.rt.rng().random(); let (_, outcome) = run_isolate_v2_udf( self.rt.clone(), tx, self.module_loader.clone(), SeedData { rng_seed, unix_timestamp: self.rt.unix_timestamp(), }, UdfType::Query, path_and_args, self.key_broker.clone(), ExecutionContext::new_for_test(), QueryJournal::new(), ) .await?; Ok(outcome.result.unwrap_err()) } else { let (_, outcome) = self .isolate .execute_udf( UdfType::Query, path_and_args, tx, QueryJournal::new(), ExecutionContext::new_for_test(), self.environment_data.clone(), 0, DEV_INSTANCE_NAME.to_string(), None, ) .await?; match outcome { FunctionOutcome::Query(query_outcome) => Ok(query_outcome.result.unwrap_err()), _ => Err(anyhow::anyhow!( "Called query_js_error_no_validation on a non-query" )), } } } pub async fn add_index(&self, index: IndexMetadata<TableName>) -> anyhow::Result<()> { let mut tx = self.database.begin(Identity::system()).await?; IndexModel::new(&mut tx) .add_application_index(TableNamespace::test_user(), index) .await?; self.database.commit(tx).await?; Ok(()) } pub async fn backfill_indexes(&self) -> anyhow::Result<()> { let retention_validator = Arc::new( FollowerRetentionManager::new(self.rt.clone(), self.persistence.reader()).await?, ); IndexWorker::new_terminating( self.rt.clone(), self.persistence.clone(), retention_validator, self.database.clone(), ) .await?; self.enable_backfilled_indexes().await?; Ok(()) } pub async fn backfill_text_indexes(&self) -> anyhow::Result<()> { let segment_term_metadata_fetcher = Arc::new(InProcessSearcher::new(self.rt.clone()).await?); backfill_text_indexes( self.rt.clone(), self.database.clone(), self.persistence.reader(), self.search_storage.clone(), segment_term_metadata_fetcher, ) .await?; self.enable_backfilled_indexes().await } pub async fn backfill_vector_indexes(&self) -> anyhow::Result<()> { backfill_vector_indexes( self.rt.clone(), self.database.clone(), self.persistence.reader(), self.search_storage.clone(), ) .await?; self.enable_backfilled_indexes().await } async fn enable_backfilled_indexes(&self) -> anyhow::Result<()> { let mut tx = self.database.begin_system().await?; let indexes: Vec<IndexMetadata<TableName>> = IndexModel::new(&mut tx) .get_application_indexes(TableNamespace::test_user()) .await? .into_iter() .map(|doc| doc.into_value()) .filter(|index| !index.config.is_enabled()) .collect(); for index in indexes { IndexModel::new(&mut tx) .enable_index_for_testing(TableNamespace::test_user(), &index.name) .await? } self.database.commit(tx).await?; Ok(()) } pub async fn http_action( &self, udf_path: &str, http_request: HttpActionRequest, identity: Identity, ) -> anyhow::Result<HttpActionResponse> { let (result, _log_lines) = self._http_action(udf_path, http_request, identity).await?; match result { Ok(r) => Ok(r), Err(e) => anyhow::bail!( "action failed with user error. If that is intended, call http_action_js_error or \ raw_http_action instead. {e:?}" ), } } pub async fn http_action_with_log_lines( &self, udf_path: &str, http_request: HttpActionRequest, identity: Identity, ) -> anyhow::Result<(HttpActionResponse, LogLines)> { let (result, log_lines) = self._http_action(udf_path, http_request, identity).await?; match result { Ok(r) => Ok((r, log_lines)), Err(e) => anyhow::bail!( "action failed with user error. If that is intended, call http_action_js_error or \ raw_http_action instead. {e:?}" ), } } pub async fn http_action_js_error( &self, udf_path: &str, http_request: HttpActionRequest, identity: Identity, ) -> anyhow::Result<JsError> { let (result, _log_lines) = self._http_action(udf_path, http_request, identity).await?; Ok(result.unwrap_err()) } async fn _http_action( &self, udf_path: &str, http_request: HttpActionRequest, identity: Identity, ) -> anyhow::Result<(Result<HttpActionResponse, JsError>, LogLines)> { let (response_sender, mut response_receiver) = mpsc::unbounded_channel(); let http_response_streamer = HttpActionResponseStreamer::new(response_sender); let (outcome, log_lines) = self .raw_http_action(udf_path, http_request, identity, http_response_streamer) .await?; let mut response_head = None; let mut body = vec![]; while let Some(part) = response_receiver.recv().await { match part { HttpActionResponsePart::BodyChunk(bytes) => body.extend(bytes), HttpActionResponsePart::Head(head) => response_head = Some(head), } } let response = match outcome { HttpActionResult::Error(e) => Err(e), HttpActionResult::Streamed => { let response_head = response_head.unwrap(); Ok(HttpActionResponse { body: Some(body), status: response_head.status, headers: response_head.headers, }) }, }; Ok((response, log_lines)) } pub async fn raw_http_action( &self, udf_path: &str, http_request: HttpActionRequest, identity: Identity, http_response_streamer: HttpActionResponseStreamer, ) -> anyhow::Result<(HttpActionResult, LogLines)> { let app = Arc::new(self.clone()); let mut tx = self.database.begin(identity.clone()).await?; let path: UdfPath = udf_path.parse()?; let validated_path = match ValidatedHttpPath::new_for_tests(&mut tx, path.canonicalize(), None).await? { Ok(validated_path) => validated_path, Err(e) => return Ok((HttpActionResult::Error(e), vec![].into())), }; let fetch_client = Arc::new(ProxiedFetchClient::new(None, DEV_INSTANCE_NAME.to_owned())); let (log_line_sender, mut log_line_receiver) = mpsc::unbounded_channel(); let outcome = self .isolate .execute_http_action( validated_path, RoutedHttpPath(http_request.head.url.path().to_string()), http_request, identity, app.clone(), fetch_client, log_line_sender, http_response_streamer, tx, ExecutionContext::new_for_test(), self.environment_data.clone(), DEV_INSTANCE_NAME.to_string(), None, ) .await?; let mut log_lines = vec![]; while let Some(log_line) = log_line_receiver.recv().await { log_lines.push(log_line); } Ok((outcome.result, log_lines.into())) } pub async fn action(&self, udf_path: &str, args: ConvexObject) -> anyhow::Result<ConvexValue> { self.action_with_identity(udf_path, args, Identity::system()) .await } pub async fn action_js_error( &self, udf_path: &str, args: ConvexObject, ) -> anyhow::Result<JsError> { let (outcome, _log_lines) = self .raw_action( udf_path, vec![ConvexValue::Object(args)], Identity::system(), ) .await?; Ok(outcome.result.unwrap_err()) } pub async fn action_log_lines( &self, udf_path: &str, args: ConvexObject, ) -> anyhow::Result<LogLines> { let (_value, _outcome, log_lines) = self .action_outcome_and_log_lines(udf_path, args, Identity::system()) .await?; Ok(log_lines) } pub async fn action_with_identity( &self, udf_path: &str, args: ConvexObject, identity: Identity, ) -> anyhow::Result<ConvexValue> { let (v, _) = self.action_outcome(udf_path, args, identity).await?; Ok(v) } pub async fn action_outcome( &self, udf_path: &str, args: ConvexObject, identity: Identity, ) -> anyhow::Result<(ConvexValue, ActionOutcome)> { let (value, outcome, _) = self .action_outcome_and_log_lines(udf_path, args, identity) .await?; Ok((value, outcome)) } pub async fn action_outcome_and_log_lines( &self, udf_path: &str, args: ConvexObject, identity: Identity, ) -> anyhow::Result<(ConvexValue, ActionOutcome, LogLines)> { let (outcome, log_lines) = self .raw_action(udf_path, vec![ConvexValue::Object(args)], identity) .await?; let value = outcome .result .as_ref() .map(|v| v.unpack()) .map_err(|e| { anyhow::anyhow!( "action failed with user error. If that is intended, call action_js_error or \ raw_action instead. {e:?}" ) }) .unwrap(); Ok((value, outcome, log_lines)) } pub async fn raw_action( &self, udf_path: &str, args: Vec<ConvexValue>, identity: Identity, ) -> anyhow::Result<(ActionOutcome, LogLines)> { let mut tx = self.database.begin(identity.clone()).await?; let path = ComponentFunctionPath { component: ComponentPath::test_user(), udf_path: udf_path.parse()?, }; let canonicalized_path = path.canonicalize(); let args_array = ConvexArray::try_from(args)?; let validated_path_or_err = ValidatedPathAndArgs::new( AllowedVisibility::PublicOnly, &mut tx, PublicFunctionPath::Component(canonicalized_path.clone()), args_array.clone(), UdfType::Action, ) .await?; let path_and_args = match validated_path_or_err { Err(js_error) => { return Ok(( ActionOutcome::from_error( js_error, canonicalized_path, args_array, identity.into(), self.rt.clone(), None, ), vec![].into(), )) }, Ok(path_and_args) => path_and_args, }; let fetch_client = Arc::new(ProxiedFetchClient::new(None, DEV_INSTANCE_NAME.to_owned())); let (log_line_sender, mut log_line_receiver) = mpsc::unbounded_channel(); // TODO(presley): Make this also be able to use local executor. let outcome = self .isolate .execute_action( path_and_args, tx, Arc::new(self.clone()), fetch_client, log_line_sender, ExecutionContext::new_for_test(), self.environment_data.clone(), DEV_INSTANCE_NAME.to_string(), None, ) .await?; let mut log_lines = vec![]; while let Some(log_line) = log_line_receiver.recv().await { log_lines.push(log_line); } Ok((outcome, log_lines.into())) } } pub static DEFAULT_CONFIG: LazyLock<UdfTestConfig> = LazyLock::new(|| UdfTestConfig { isolate_config: IsolateConfig::default(), udf_server_version: Version::parse("1000.0.0").unwrap(), }); pub static DEFAULT_MAX_ISOLATE_WORKERS: usize = 2; #[derive(Clone)] pub struct UdfTestConfig { pub isolate_config: IsolateConfig, pub udf_server_version: Version, } /// Rust can't seem to determine the type of the function argument in these /// tests, so we specify it explicitly. pub type UdfTestType = UdfTest<TestRuntime, TestPersistence>; impl<RT: Runtime> UdfTest<RT, TestPersistence> { pub async fn run_test_with_isolate<F, Fut>(rt: RT, mut f: F) -> anyhow::Result<()> where F: FnMut(Self) -> Fut, Fut: Future<Output = anyhow::Result<()>>, { let test = Self::default(rt.clone()).await?; f(test).await?; Ok(()) } pub async fn run_test_with_isolate2<F, Fut>(rt: RT, mut f: F) -> anyhow::Result<()> where F: FnMut(Self) -> Fut, Fut: Future<Output = anyhow::Result<()>>, { let test = Self::default(rt.clone()).await?; f(test).await.context("test failed on isolate1")?; let mut test = Self::default(rt.clone()).await?; test.enable_isolate_v2(); f(test).await.context("test failed on isolate2")?; Ok(()) } pub async fn default(rt: RT) -> anyhow::Result<Self> { Self::default_with_config(DEFAULT_CONFIG.clone(), DEFAULT_MAX_ISOLATE_WORKERS, rt).await } pub async fn default_with_config( config: UdfTestConfig, max_isolate_workers: usize, rt: RT, ) -> anyhow::Result<Self> { let result = Self::new( TEST_SOURCE_ISOLATE_ONLY.clone(), rt, Arc::new(TestPersistence::new()), config, max_isolate_workers, ) .await? .expect("Unexpected JSError"); Ok(result) } pub async fn default_with_modules( modules: Vec<ModuleConfig>, rt: RT, ) -> anyhow::Result<Result<Self, JsError>> { Self::new( modules, rt, Arc::new(TestPersistence::new()), DEFAULT_CONFIG.clone(), DEFAULT_MAX_ISOLATE_WORKERS, ) .await } pub async fn with_timeout(rt: RT, timeout: Option<Duration>) -> anyhow::Result<Self> { let result = Self::new( TEST_SOURCE_ISOLATE_ONLY.clone(), rt, Arc::new(TestPersistence::new()), UdfTestConfig { isolate_config: IsolateConfig::new_with_max_user_timeout( "test", timeout, ConcurrencyLimiter::unlimited(), ), udf_server_version: "1000.0.0".parse().unwrap(), }, DEFAULT_MAX_ISOLATE_WORKERS, ) .await? .expect("Unexpected JSError"); Ok(result) } } #[async_trait] impl<RT: Runtime, P: Persistence> ActionCallbacks for UdfTest<RT, P> { async fn execute_query( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, _context: ExecutionContext, ) -> anyhow::Result<FunctionResult> { let arguments = parse_udf_args(&path.udf_path, args.into_args()?)?; let str_name = String::from(path.udf_path); let (outcome, _) = self .raw_query(&str_name, arguments.into(), identity, None) .await?; Ok(FunctionResult { result: outcome.result, }) } async fn execute_mutation( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, _context: ExecutionContext, ) -> anyhow::Result<FunctionResult> { let arguments = parse_udf_args(&path.udf_path, args.into_args()?)?; let str_name = String::from(path.udf_path); let outcome = self .raw_mutation(&str_name, arguments.into(), identity) .await?; Ok(FunctionResult { result: outcome.result, }) } async fn execute_action( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, args: SerializedArgs, _context: ExecutionContext, ) -> anyhow::Result<FunctionResult> { let arguments = parse_udf_args(&path.udf_path, args.into_args()?)?; let str_name = String::from(path.udf_path); let (outcome, _) = self .raw_action(&str_name, arguments.into(), identity) .await?; Ok(FunctionResult { result: outcome.result, }) } async fn storage_get_url( &self, identity: Identity, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<Option<String>> { let mut tx = self.database.begin(identity).await?; self.file_storage .get_url(&mut tx, component, storage_id) .await } async fn storage_get_file_entry( &self, identity: Identity, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<Option<(ComponentPath, FileStorageEntry)>> { let mut tx = self.database.begin(identity).await?; let entry = self .file_storage .get_file_entry(&mut tx, component.into(), storage_id) .await?; Ok(entry.map(|e| (ComponentPath::root(), e))) } async fn storage_store_file_entry( &self, identity: Identity, component: ComponentId, entry: FileStorageEntry, ) -> anyhow::Result<(ComponentPath, DeveloperDocumentId)> { let mut tx = self.database.begin(identity).await?; let id = self .file_storage .store_file_entry(&mut tx, component.into(), entry) .await?; self.database.commit(tx).await?; Ok((ComponentPath::root(), id)) } async fn storage_delete( &self, identity: Identity, component: ComponentId, storage_id: FileStorageId, ) -> anyhow::Result<()> { let mut tx = self.database.begin(identity).await?; self.file_storage .delete(&mut tx, component.into(), storage_id.clone()) .await?; self.database.commit(tx).await?; Ok(()) } async fn schedule_job( &self, identity: Identity, scheduling_component: ComponentId, scheduled_path: CanonicalizedComponentFunctionPath, udf_args: SerializedArgs, scheduled_ts: UnixTimestamp, context: ExecutionContext, ) -> anyhow::Result<DeveloperDocumentId> { let mut tx: database::Transaction<RT> = self.database.begin(identity).await?; let (scheduled_path, udf_args) = validate_schedule_args( scheduled_path, udf_args.into_args()?, scheduled_ts, // Scheduling from actions is not transactional and happens at latest // timestamp. self.database.runtime().unix_timestamp(), &mut tx, ) .await?; let virtual_id = VirtualSchedulerModel::new(&mut tx, scheduling_component.into()) .schedule(scheduled_path, udf_args, scheduled_ts, context) .await?; self.database.commit(tx).await?; Ok(virtual_id) } async fn cancel_job( &self, identity: Identity, virtual_id: DeveloperDocumentId, ) -> anyhow::Result<()> { let mut tx = self.database.begin(identity).await?; VirtualSchedulerModel::new(&mut tx, ComponentId::test_user().into()) .cancel(virtual_id) .await?; self.database.commit(tx).await?; Ok(()) } async fn vector_search( &self, identity: Identity, query: JsonValue, ) -> anyhow::Result<(Vec<PublicVectorSearchQueryResult>, FunctionUsageStats)> { let query = VectorSearch::try_from(query)?; self.database.vector_search(identity, query).await } async fn lookup_function_handle( &self, identity: Identity, handle: FunctionHandle, ) -> anyhow::Result<CanonicalizedComponentFunctionPath> { let mut tx = self.database.begin(identity).await?; FunctionHandlesModel::new(&mut tx).lookup(handle).await } async fn create_function_handle( &self, identity: Identity, path: CanonicalizedComponentFunctionPath, ) -> anyhow::Result<FunctionHandle> { let mut tx = self.database.begin(identity).await?; FunctionHandlesModel::new(&mut tx) .get_with_component_path(path) .await } } /// Create a bogus UDF request for testing. Should only be used for tests /// that don't depend on UDF execution succeeding, like scheduler tests. pub async fn bogus_udf_request<RT: Runtime>( db: &Database<RT>, client_id: &str, sender: oneshot::Sender<anyhow::Result<(Transaction<RT>, FunctionOutcome)>>, ) -> anyhow::Result<Request<RT>> { let tx = db.begin_system().await?; let request = UdfRequest { path_and_args: ValidatedPathAndArgs::new_for_tests( "path.js:default".parse()?, ConvexArray::empty(), None, ), udf_type: UdfType::Query, transaction: tx, journal: QueryJournal::new(), context: ExecutionContext::new_for_test(), }; let inner = RequestType::Udf { request, environment_data: test_environment_data(db.runtime().clone())?, response: sender, queue_timer: queue_timer(), udf_callback: Box::new(BogusUdfCallback), reactor_depth: 0, function_started_sender: None, }; Ok(Request { client_id: client_id.to_string(), inner, parent_trace: EncodedSpan::empty(), }) } struct BogusUdfCallback; #[async_trait] impl<RT: Runtime> UdfCallback<RT> for BogusUdfCallback { async fn execute_udf( &self, _client_id: String, _udf_type: UdfType, _path_and_args: ValidatedPathAndArgs, _environment_data: EnvironmentData<RT>, _transaction: Transaction<RT>, _journal: QueryJournal, _context: ExecutionContext, _reactor_depth: usize, ) -> anyhow::Result<(Transaction<RT>, FunctionOutcome)> { anyhow::bail!("BogusUdfCallback called") } } pub async fn test_isolate_recreated_with_client_change<RT: Runtime, W: IsolateWorker<RT>>( rt: RT, worker: W, hold_guard: HoldGuard, ) -> anyhow::Result<()> { initialize_v8(); let mut wait_for_blocked = hold_guard.wait_for_blocked().boxed(); let heap_stats = SharedIsolateHeapStats::new(); let (work_sender, work_receiver) = mpsc::channel(1); let _handle = rt.spawn_thread("isolate", move || { worker.service_requests::<Option<usize>>(work_receiver, heap_stats) }); let DbFixtures { db, .. } = DbFixtures::new(&rt).await?; let (done_sender, done_receiver) = oneshot::channel(); let (sender, _rx) = oneshot::channel(); let request = bogus_udf_request(&db, "carnitas", sender).await?; work_sender.try_send((request, done_sender, None)).unwrap(); let mut done_receiver = done_receiver.boxed(); // First request should not recreate isolate. select! { _ = done_receiver.as_mut().fuse() => { Ok(()) }, _ = wait_for_blocked.as_mut().fuse() => { Err(anyhow::anyhow!("recreated isolate on the first request")) } }?; // Second request with different client_id should recreate isolate. let (done_sender, done_receiver) = oneshot::channel(); let (sender, _rx) = oneshot::channel(); let request = bogus_udf_request(&db, "alpastor", sender).await?; work_sender.try_send((request, done_sender, None)).unwrap(); let mut done_receiver = done_receiver.boxed(); loop { select! { _ = done_receiver.as_mut().fuse() => { anyhow::bail!("Should have recreated isolate on the second request"); }, pause_guard = wait_for_blocked.as_mut().fuse() => { if let Some(pause_guard) = pause_guard { drop(done_receiver); pause_guard.unpause(); drop(wait_for_blocked); break; } } } } Ok(()) } pub async fn test_isolate_not_recreated_with_same_client<RT: Runtime, W: IsolateWorker<RT>>( rt: RT, worker: W, hold_guard: HoldGuard, ) -> anyhow::Result<()> { initialize_v8(); let mut wait_for_blocked = hold_guard.wait_for_blocked().boxed(); let heap_stats = SharedIsolateHeapStats::new(); let (work_sender, work_receiver) = mpsc::channel(1); let _handle = rt.spawn_thread("isolate", move || { worker.service_requests::<Option<usize>>(work_receiver, heap_stats) }); let DbFixtures { db, .. } = DbFixtures::new(&rt).await?; let (done_sender, done_receiver) = oneshot::channel(); let (sender, _rx) = oneshot::channel(); let request = bogus_udf_request(&db, "carnitas", sender).await?; work_sender.try_send((request, done_sender, None)).unwrap(); let mut done_receiver = done_receiver.boxed(); // First request should not recreate isolate. select! { _ = done_receiver.as_mut().fuse() => { Ok(()) }, _ = wait_for_blocked.as_mut().fuse() => { Err(anyhow::anyhow!("recreated isolate on the first request")) } }?; // Second request with the same client_id should not recreate isolate. let (done_sender, done_receiver) = oneshot::channel(); let (sender, _rx) = oneshot::channel(); let request = bogus_udf_request(&db, "carnitas", sender).await?; work_sender.try_send((request, done_sender, None)).unwrap(); let mut done_receiver = done_receiver.boxed(); select! { _ = done_receiver.as_mut().fuse() => { Ok(()) }, _ = wait_for_blocked.as_mut().fuse() => { Err(anyhow::anyhow!("recreated isolate on the second request with the same client id")) } }?; Ok(()) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server