Skip to main content
Glama

Convex MCP server

Official
by get-convex
test_helpers.rs15.3 kB
use std::{ collections::BTreeMap, fs::File, io::Read, path::Path, sync::{ Arc, LazyLock, }, time::Duration, }; use anyhow::Context; use async_trait::async_trait; use authentication::{ access_token_auth::NullAccessTokenAuth, application_auth::ApplicationAuth, }; use cmd_util::env::config_test; use common::{ bootstrap_model::index::{ database_index::IndexedFields, IndexMetadata, }, components::ComponentId, db_schema, http::fetch::StaticFetchClient, knobs::{ ACTION_USER_TIMEOUT, UDF_CACHE_MAX_SIZE, }, persistence::Persistence, runtime::{ new_unlimited_rate_limiter, Runtime, }, shutdown::ShutdownSignal, testing::TestPersistence, types::{ ConvexOrigin, FullyQualifiedObjectKey, }, }; use database::{ Database, IndexModel, SchemaModel, Transaction, }; use events::usage::{ NoOpUsageEventLogger, UsageEventLogger, }; use exports::interface::InProcessExportProvider; use file_storage::{ FileStorage, TransactionalFileStorage, }; use function_runner::{ in_process_function_runner::InProcessFunctionRunner, server::InstanceStorage, }; use isolate::{ bundled_js::OUT_DIR, test_helpers::{ TEST_SOURCE, TEST_SOURCE_ISOLATE_ONLY, }, }; use keybroker::{ Identity, KeyBroker, DEV_INSTANCE_NAME, DEV_SECRET, }; use model::{ config::{ types::ConfigMetadata, ConfigModel, }, cron_jobs::types::CronJob, database_globals::types::StorageTagInitializer, exports::{ types::{ Export, ExportFormat, ExportRequestor, }, ExportsModel, }, initialize_application_system_tables, scheduled_jobs::types::ScheduledJob, udf_config::types::UdfConfig, virtual_system_mapping, }; use node_executor::{ noop::NoopNodeExecutor, Actions, NodeExecutor, }; use storage::Storage; use value::{ ResolvedDocumentId, TableName, TableNamespace, }; use crate::{ cache::QueryCache, cron_jobs::CronJobContext, deploy_config::{ FinishPushDiff, SchemaStatus, StartPushRequest, StartPushResponse, }, log_visibility::RedactLogsToClient, scheduled_jobs::ScheduledJobContext, Application, }; pub static OBJECTS_TABLE: LazyLock<TableName> = LazyLock::new(|| "objects".parse().unwrap()); pub static OBJECTS_TABLE_COMPONENT: ComponentId = ComponentId::test_user(); #[derive(Default)] pub struct ApplicationFixtureArgs { pub tp: Option<TestPersistence>, pub event_logger: Option<Arc<dyn UsageEventLogger>>, pub node_executor: Option<Arc<dyn NodeExecutor>>, } impl ApplicationFixtureArgs { pub fn with_event_logger(event_logger: Arc<dyn UsageEventLogger>) -> Self { Self { event_logger: Some(event_logger), ..Default::default() } } pub fn with_node_executor(node_executor: Arc<dyn NodeExecutor>) -> Self { Self { node_executor: Some(node_executor), ..Default::default() } } } #[async_trait] pub trait ApplicationTestExt<RT: Runtime> { async fn new_for_tests(rt: &RT) -> anyhow::Result<Application<RT>>; async fn new_for_tests_with_args( rt: &RT, args: ApplicationFixtureArgs, ) -> anyhow::Result<Application<RT>>; async fn test_one_off_scheduled_job_executor_run( &self, job: ScheduledJob, job_id: ResolvedDocumentId, ) -> anyhow::Result<()>; /// Load the modules from npm-packages/udf-tests async fn load_udf_tests_modules(&self) -> anyhow::Result<()>; async fn load_udf_tests_modules_with_node(&self) -> anyhow::Result<()>; /// Load the modules form npm-packages/component-tests async fn load_component_tests_modules(&self, layout: &str) -> anyhow::Result<()>; async fn start_push_for_layout(&self, layout: &str) -> anyhow::Result<StartPushResponse>; async fn start_push_from_request( &self, request: StartPushRequest, ) -> anyhow::Result<StartPushResponse>; async fn run_test_push(&self, request: StartPushRequest) -> anyhow::Result<FinishPushDiff>; async fn test_one_off_cron_job_executor_run(&self, job: CronJob) -> anyhow::Result<()>; fn validate_user_defined_index_fields( &self, fields: IndexedFields, ) -> anyhow::Result<IndexedFields>; fn database(&self) -> &Database<RT>; fn snapshot_imports_storage(&self) -> Arc<dyn Storage>; fn exports_storage(&self) -> Arc<dyn Storage>; async fn export_and_wait(&self) -> anyhow::Result<FullyQualifiedObjectKey>; async fn add_index( &self, index: IndexMetadata<TableName>, ) -> anyhow::Result<ResolvedDocumentId>; } #[async_trait] impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> { async fn new_for_tests(rt: &RT) -> anyhow::Result<Application<RT>> { Self::new_for_tests_with_args(rt, Default::default()).await } async fn new_for_tests_with_args( rt: &RT, args: ApplicationFixtureArgs, ) -> anyhow::Result<Application<RT>> { config_test(); let kb = KeyBroker::dev(); let storage_dir = tempfile::TempDir::new()?; let convex_origin: ConvexOrigin = "http://127.0.0.1:8000".into(); let convex_site = "http://127.0.0.1:8001".into(); let searcher = Arc::new(search::searcher::SearcherStub {}); let segment_term_metadata_fetcher = Arc::new(search::searcher::SearcherStub {}); let persistence = args.tp.unwrap_or_else(TestPersistence::new); let database = Database::load( Arc::new(persistence.clone()), rt.clone(), searcher.clone(), ShutdownSignal::panic(), virtual_system_mapping().clone(), args.event_logger.unwrap_or(Arc::new(NoOpUsageEventLogger)), // Essentially unlimited rate limit for testing Arc::new(new_unlimited_rate_limiter(rt.clone())), ) .await?; initialize_application_system_tables(&database).await?; let application_storage = Application::initialize_storage( rt.clone(), &database, StorageTagInitializer::Local { dir: storage_dir.path().to_path_buf(), }, DEV_INSTANCE_NAME.into(), ) .await?; let fetch_client = Arc::new(StaticFetchClient::new()); let function_runner = Arc::new( InProcessFunctionRunner::new( DEV_INSTANCE_NAME.into(), DEV_SECRET.try_into()?, convex_origin.clone(), rt.clone(), persistence.reader(), InstanceStorage { files_storage: application_storage.files_storage.clone(), modules_storage: application_storage.modules_storage.clone(), }, database.clone(), fetch_client.clone(), ) .await?, ); let file_storage = FileStorage { transactional_file_storage: TransactionalFileStorage::new( rt.clone(), application_storage.files_storage.clone(), convex_origin.clone(), ), database: database.clone(), }; let node_executor = args .node_executor .unwrap_or_else(|| Arc::new(NoopNodeExecutor::new())); let actions = Actions::new( node_executor, convex_origin.clone(), *ACTION_USER_TIMEOUT, rt.clone(), ); let application = Application::new( rt.clone(), database.clone(), file_storage.clone(), application_storage, database.usage_counter(), kb.clone(), DEV_INSTANCE_NAME.into(), function_runner, convex_origin, convex_site, searcher, segment_term_metadata_fetcher, Arc::new(persistence.clone()), actions, Arc::new(RedactLogsToClient::new(false)), Arc::new(ApplicationAuth::new( kb.clone(), Arc::new(NullAccessTokenAuth), )), QueryCache::new(*UDF_CACHE_MAX_SIZE), fetch_client, None, // local_log_sink ShutdownSignal::panic(), Arc::new(InProcessExportProvider), ) .await?; Ok(application) } fn snapshot_imports_storage(&self) -> Arc<dyn Storage> { self.application_storage.snapshot_imports_storage.clone() } fn exports_storage(&self) -> Arc<dyn Storage> { self.application_storage.exports_storage.clone() } async fn test_one_off_scheduled_job_executor_run( &self, job: ScheduledJob, job_id: ResolvedDocumentId, ) -> anyhow::Result<()> { let test_executor = ScheduledJobContext::new( self.runtime.clone(), self.database.clone(), self.runner.clone(), self.function_log.clone(), ); test_executor.execute_job(job, job_id).await; Ok(()) } async fn test_one_off_cron_job_executor_run(&self, job: CronJob) -> anyhow::Result<()> { let test_executor = CronJobContext::new( self.runtime.clone(), self.instance_name.clone(), self.database.clone(), self.runner.clone(), self.function_log.clone(), ); test_executor.execute_job(job).await; Ok(()) } async fn load_udf_tests_modules(&self) -> anyhow::Result<()> { self.load_udf_tests_modules_inner(false).await } async fn load_udf_tests_modules_with_node(&self) -> anyhow::Result<()> { self.load_udf_tests_modules_inner(true).await } async fn load_component_tests_modules(&self, layout: &str) -> anyhow::Result<()> { let request = Self::load_start_push_request(Path::new(layout))?; self.run_test_push(request).await?; Ok(()) } async fn start_push_for_layout(&self, layout: &str) -> anyhow::Result<StartPushResponse> { let request = Self::load_start_push_request(Path::new(layout))?; self.start_push_from_request(request).await } async fn start_push_from_request( &self, request: StartPushRequest, ) -> anyhow::Result<StartPushResponse> { let config = request.into_project_config()?; self.start_push(&config).await } async fn run_test_push(&self, request: StartPushRequest) -> anyhow::Result<FinishPushDiff> { let start_push = self.start_push_from_request(request).await?; loop { let schema_status = self .wait_for_schema( Identity::system(), start_push.schema_change.clone(), Duration::from_secs(10), ) .await?; match schema_status { SchemaStatus::InProgress { .. } => continue, SchemaStatus::Complete => break, _ => anyhow::bail!("Unexpected schema status: {schema_status:?}"), } } let diff = self.finish_push(Identity::system(), start_push).await?; Ok(diff) } fn validate_user_defined_index_fields( &self, fields: IndexedFields, ) -> anyhow::Result<IndexedFields> { self._validate_user_defined_index_fields(fields) } fn database(&self) -> &Database<RT> { &self.database } async fn export_and_wait(&self) -> anyhow::Result<FullyQualifiedObjectKey> { let export_id = self .request_export( Identity::system(), ExportFormat::Zip { include_storage: true, }, ComponentId::Root, ExportRequestor::CloudBackup, None, ) .await?; let export_object_key = loop { let mut tx = self.begin(Identity::system()).await?; let export_doc = ExportsModel::new(&mut tx) .get(export_id) .await? .context("Missing?")? .into_value(); let Export::Completed { zip_object_key, .. } = export_doc else { continue; }; break zip_object_key; }; Ok(self.cloud_export_key(export_object_key)) } async fn add_index( &self, index: IndexMetadata<TableName>, ) -> anyhow::Result<ResolvedDocumentId> { let mut tx = self.begin(Identity::system()).await?; let index_id = IndexModel::new(&mut tx) .add_application_index(TableNamespace::test_user(), index) .await?; self.commit_test(tx).await?; Ok(index_id) } } impl<RT: Runtime> Application<RT> { pub fn load_start_push_request(layout_path: &Path) -> anyhow::Result<StartPushRequest> { let path = Path::new(OUT_DIR) .join(layout_path) .join("start_push_request.json"); let mut file = File::open(path)?; let mut contents = vec![]; file.read_to_end(&mut contents)?; let output: StartPushRequest = serde_json::from_slice(&contents)?; Ok(output) } async fn load_udf_tests_modules_inner(&self, include_node: bool) -> anyhow::Result<()> { let test_source = if include_node { TEST_SOURCE.clone() } else { TEST_SOURCE_ISOLATE_ONLY.clone() }; let mut tx = self.begin(Identity::system()).await?; let udf_config = UdfConfig::new_for_test(&self.runtime(), "1000.0.0".parse()?); // TODO(rakeeb): add external packages to udf test modules let source_package = self.upload_package(&test_source, None, None).await?; let analyze_results = self .analyze( udf_config.clone(), test_source.clone(), source_package.clone(), BTreeMap::new(), BTreeMap::new(), ) .await??; let schema_id = insert_validated_schema(&mut tx).await?; ConfigModel::new(&mut tx, ComponentId::test_user()) .apply( ConfigMetadata::new(), test_source.clone(), udf_config, Some(source_package), analyze_results, Some(schema_id), ) .await?; self.commit_test(tx).await?; Ok(()) } } // The contents of the schema are irrelevant for the modules, but we need one to // be present for the rest of apply_config. async fn insert_validated_schema<RT: Runtime>( tx: &mut Transaction<RT>, ) -> anyhow::Result<ResolvedDocumentId> { let schema = db_schema!(); let mut model = SchemaModel::new(tx, TableNamespace::test_user()); let (schema_id, _) = model.submit_pending(schema).await?; model.mark_validated(schema_id).await?; Ok(schema_id) }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server