Skip to main content
Glama
lib.rs46.2 kB
//! This crate provides a harness for running dal integration tests as well as helpers and resources //! when doing so. #![recursion_limit = "256"] #![warn( bad_style, clippy::expect_used, clippy::panic, clippy::unwrap_used, dead_code, improper_ctypes, missing_debug_implementations, missing_docs, no_mangle_generic_items, non_shorthand_field_patterns, overflowing_literals, path_statements, patterns_in_fns_without_body, unconditional_recursion, unreachable_pub, unused, unused_allocation, unused_comparisons, unused_parens, while_true )] use std::{ borrow::Cow, env, fmt, future::IntoFuture, path::{ Path, PathBuf, }, str::FromStr, sync::{ Arc, Once, }, }; use audit_database::AuditDatabaseContext; use buck2_resources::Buck2Resources; use dal::{ DalContext, DalLayerDb, JetstreamStreams, ServicesContext, Workspace, builtins::func, feature_flags::FeatureFlagService, job::processor::{ JobQueueProcessor, NatsProcessor, }, }; use derive_builder::Builder; use jwt_simple::prelude::RS256KeyPair; use lazy_static::lazy_static; use si_crypto::{ SymmetricCryptoService, SymmetricCryptoServiceConfig, SymmetricCryptoServiceConfigFile, VeritechEncryptionKey, }; use si_data_nats::{ NatsClient, NatsConfig, jetstream, }; use si_data_pg::{ PgPool, PgPoolConfig, }; use si_jwt_public_key::{ JwtAlgo, JwtConfig, JwtPublicSigningKeyChain, }; use si_layer_cache::hybrid_cache::CacheConfig; use si_runtime::DedicatedExecutor; use si_std::{ CanonicalFile, ResultExt, }; use si_tls::CertificateSource; use telemetry::prelude::*; use tokio::{ fs::File, io::AsyncReadExt, sync::Mutex, }; use tokio_util::{ sync::CancellationToken, task::TaskTracker, }; use uuid::Uuid; pub mod expand_helpers; pub mod expected; pub mod helpers; pub mod prelude { //! This module provides a standard set of tools for authoring DAL integration tests. pub use color_eyre::{ Result, eyre::OptionExt, }; pub use crate::{ WorkspaceSignup, helpers::ChangeSetTestHelpers, }; } mod signup; /// Schema variants the test harness expects to be installed pub mod test_exclusive_schemas; pub use color_eyre::{ self, eyre::{ Report, Result, WrapErr, eyre, }, }; pub use si_test_macros::{ dal_test as test, sdf_test, }; pub use signup::WorkspaceSignup; pub use telemetry; pub use test_exclusive_schemas::{ SCHEMA_ID_BAD_VALIDATIONS, SCHEMA_ID_DUMMY_SECRET, SCHEMA_ID_ETOILES, SCHEMA_ID_FAKE_BUTANE, SCHEMA_ID_FAKE_DOCKER_IMAGE, SCHEMA_ID_FALLOUT, SCHEMA_ID_KATY_PERRY, SCHEMA_ID_LARGE_EVEN_LEGO, SCHEMA_ID_LARGE_ODD_LEGO, SCHEMA_ID_MEDIUM_EVEN_LEGO, SCHEMA_ID_MEDIUM_ODD_LEGO, SCHEMA_ID_MORNINGSTAR, SCHEMA_ID_PET_SHOP, SCHEMA_ID_PIRATE, SCHEMA_ID_PRIVATE_LANGUAGE, SCHEMA_ID_SMALL_EVEN_LEGO, SCHEMA_ID_SMALL_ODD_LEGO, SCHEMA_ID_STARFIELD, SCHEMA_ID_SWIFTY, SCHEMA_ID_VALIDATED_INPUT, SCHEMA_ID_VALIDATED_OUTPUT, }; pub use tracing_subscriber; const DEFAULT_TEST_PG_USER: &str = "si_test"; const DEFAULT_TEST_PG_PORT_STR: &str = "6432"; const DEFAULT_TEST_MODULE_INDEX_URL: &str = "http://localhost:5157"; const ENV_VAR_NATS_URL: &str = "SI_TEST_NATS_URL"; const ENV_VAR_MODULE_INDEX_URL: &str = "SI_TEST_MODULE_INDEX_URL"; const ENV_VAR_PG_HOSTNAME: &str = "SI_TEST_PG_HOSTNAME"; const ENV_VAR_PG_DBNAME: &str = "SI_TEST_PG_DBNAME"; const ENV_VAR_PG_USER: &str = "SI_TEST_PG_USER"; const ENV_VAR_PG_PORT: &str = "SI_TEST_PG_PORT"; const ENV_VAR_KEEP_OLD_DBS: &str = "SI_TEST_KEEP_OLD_DBS"; const ENV_VAR_LAYER_CACHE_PG_DBNAME: &str = "SI_TEST_LAYER_CACHE_PG_DBNAME"; const ENV_VAR_AUDIT_PG_DBNAME: &str = "SI_TEST_AUDIT_PG_DBNAME"; const ENV_VAR_S3_ENDPOINT: &str = "SI_TEST_S3_ENDPOINT"; const ENV_VAR_S3_BUCKET_PREFIX: &str = "SI_TEST_S3_BUCKET_PREFIX"; const ENV_VAR_S3_REGION: &str = "SI_TEST_S3_REGION"; const ENV_VAR_S3_ACCESS_KEY: &str = "SI_TEST_S3_ACCESS_KEY"; const ENV_VAR_S3_SECRET_KEY: &str = "SI_TEST_S3_SECRET_KEY"; const ENV_VAR_PERSISTER_MODE: &str = "SI_TEST_PERSISTER_MODE"; #[allow(missing_docs)] pub static COLOR_EYRE_INIT: Once = Once::new(); lazy_static! { static ref TEST_CONTEXT_BUILDER: Mutex<ContextBuilderState> = Mutex::new(Default::default()); } /// A [`DalContext`] for a workspace in a visibility which is not in a change set /// /// To use a borrowed `DalContext` version, use [`DalContextHeadRef`]. /// To use mutably borrowed `DalContext` version, use [`DalContextHeadMutRef`]. pub struct DalContextHead(pub DalContext); impl fmt::Debug for DalContextHead { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DalContextHead").finish_non_exhaustive() } } /// A reference to a [`DalContext`] for a workspace in a visibility which is not in a change /// set /// /// To use an owned `DalContext` version, use [`DalContextHead`]. /// To use mutably borrowed `DalContext` version, use [`DalContextHeadMutRef`]. pub struct DalContextHeadRef<'a>(pub &'a DalContext); impl fmt::Debug for DalContextHeadRef<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DalContextHeadRef").finish_non_exhaustive() } } /// A mutable reference to a [`DalContext`] for a workspace in a visibility which is not in a /// change set /// /// To use an owned `DalContext` version, use [`DalContextHead`]. /// To use a borrowed `DalContext` version, use [`DalContextHeadRef`]. pub struct DalContextHeadMutRef<'a>(pub &'a mut DalContext); impl fmt::Debug for DalContextHeadMutRef<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DalContextHeadMutRef") .finish_non_exhaustive() } } /// An authentication token, used when making SDF API requests #[derive(Debug)] pub struct AuthToken(pub String); /// A reference to an authentication token, used when making SDF API requests #[derive(Debug)] pub struct AuthTokenRef<'a>(pub &'a str); #[allow(missing_docs)] #[derive(Builder, Clone, Debug)] pub struct Config { #[builder(default = "PgPoolConfig::default()")] pg: PgPoolConfig, #[builder(default = "NatsConfig::default()")] nats: NatsConfig, #[builder(default = "DEFAULT_TEST_MODULE_INDEX_URL.to_string()")] module_index_url: String, veritech_encryption_key_path: String, jwt_signing_public_key_path: String, #[builder(default = "JwtAlgo::RS256")] jwt_signing_public_key_algo: JwtAlgo, jwt_signing_private_key_path: String, postgres_key_path: String, #[builder(default)] pkgs_path: Option<PathBuf>, symmetric_crypto_service_config: SymmetricCryptoServiceConfig, #[builder(default = "si_layer_cache::default_pg_pool_config()")] layer_cache_pg_pool: PgPoolConfig, #[builder(default = "audit_database::default_pg_pool_config()")] audit_pg_pool: PgPoolConfig, #[builder(default = "si_layer_cache::ObjectStorageConfig::default()")] object_storage_config: si_layer_cache::ObjectStorageConfig, } impl Config { #[allow(clippy::disallowed_methods)] // Environment variables are used exclusively in test and // all are prefixed with `SI_TEST_` fn create_default( pg_dbname: &'static str, layer_cache_pg_dbname: &'static str, audit_pg_dbname: &'static str, ) -> Result<Self> { let mut config = { let mut builder = ConfigBuilder::default(); detect_and_configure_testing(&mut builder)?; builder.build()? }; if let Ok(value) = env::var(ENV_VAR_NATS_URL) { config.nats.url = value; } { config.pg.dbname = env::var(ENV_VAR_PG_DBNAME).unwrap_or_else(|_| pg_dbname.to_string()); if let Ok(value) = env::var(ENV_VAR_PG_HOSTNAME) { config.pg.hostname = value; } config.pg.user = env::var(ENV_VAR_PG_USER).unwrap_or_else(|_| DEFAULT_TEST_PG_USER.to_string()); config.pg.port = env::var(ENV_VAR_PG_PORT) .unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string()) .parse()?; config.pg.pool_max_size = 16; config.pg.certificate = Some(CertificateSource::Path( config.postgres_key_path.clone().try_into()?, )); } { config.layer_cache_pg_pool.dbname = env::var(ENV_VAR_LAYER_CACHE_PG_DBNAME) .unwrap_or_else(|_| layer_cache_pg_dbname.to_string()); if let Ok(value) = env::var(ENV_VAR_PG_HOSTNAME) { config.layer_cache_pg_pool.hostname = value; } config.layer_cache_pg_pool.user = env::var(ENV_VAR_PG_USER).unwrap_or_else(|_| DEFAULT_TEST_PG_USER.to_string()); config.layer_cache_pg_pool.port = env::var(ENV_VAR_PG_PORT) .unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string()) .parse()?; config.layer_cache_pg_pool.pool_max_size = 16; config.layer_cache_pg_pool.certificate = Some(CertificateSource::Path( config.postgres_key_path.clone().try_into()?, )); } { config.audit_pg_pool.dbname = env::var(ENV_VAR_AUDIT_PG_DBNAME).unwrap_or_else(|_| audit_pg_dbname.to_string()); if let Ok(value) = env::var(ENV_VAR_PG_HOSTNAME) { config.audit_pg_pool.hostname = value; } config.audit_pg_pool.user = env::var(ENV_VAR_PG_USER).unwrap_or_else(|_| DEFAULT_TEST_PG_USER.to_string()); config.audit_pg_pool.port = env::var(ENV_VAR_PG_PORT) .unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string()) .parse()?; config.audit_pg_pool.pool_max_size = 16; config.audit_pg_pool.certificate = Some(CertificateSource::Path( config.postgres_key_path.clone().try_into()?, )); } if let Ok(value) = env::var(ENV_VAR_MODULE_INDEX_URL) { config.module_index_url = value; } if let Ok(value) = env::var(ENV_VAR_S3_ENDPOINT) { config.object_storage_config.endpoint = value; } if let Ok(value) = env::var(ENV_VAR_S3_BUCKET_PREFIX) { config.object_storage_config.bucket_prefix = value; } if let Ok(value) = env::var(ENV_VAR_S3_REGION) { config.object_storage_config.region = value; } // Update S3 auth credentials if environment variables are set if let si_layer_cache::s3::S3AuthConfig::StaticCredentials { access_key, secret_key, } = &config.object_storage_config.auth { let new_access_key = env::var(ENV_VAR_S3_ACCESS_KEY).unwrap_or_else(|_| access_key.to_string()); let new_secret_key = env::var(ENV_VAR_S3_SECRET_KEY).unwrap_or_else(|_| secret_key.to_string()); config.object_storage_config.auth = si_layer_cache::s3::S3AuthConfig::StaticCredentials { access_key: new_access_key.into(), secret_key: new_secret_key.into(), }; } debug!(?config, "test config"); Ok(config) } } #[remain::sorted] #[allow(clippy::large_enum_variant)] enum ContextBuilderState { Created(TestContextBuilder), Errored(Cow<'static, str>), Uninitialized, } impl ContextBuilderState { fn created(builder: TestContextBuilder) -> Self { Self::Created(builder) } fn errored(message: impl Into<Cow<'static, str>>) -> Self { Self::Errored(message.into()) } fn config(&self) -> Result<&Config> { match self { Self::Created(builder) => Ok(&builder.config), Self::Errored(msg) => Err(eyre!("global setup has failed: {msg}")), Self::Uninitialized => Err(eyre!("global setup is uninitialized")), } } } impl Default for ContextBuilderState { fn default() -> Self { Self::Uninitialized } } /// A context used for preparing and running tests containing DAL objects. #[derive(Clone, Debug)] pub struct TestContext { /// The test context configuration used to build this instance. config: Config, /// A PostgreSQL connection pool. pg_pool: PgPool, /// A connected NATS client nats_conn: NatsClient, /// Required NATS streams nats_streams: JetstreamStreams, /// A [`JobQueueProcessor`] impl job_processor: Box<dyn JobQueueProcessor + Send + Sync>, /// A key for re-recrypting messages to the function execution system. encryption_key: Arc<VeritechEncryptionKey>, /// A service that can encrypt values based on the loaded donkeys symmetric_crypto_service: SymmetricCryptoService, /// The pg_pool for the layer db layer_db_pg_pool: PgPool, /// Dedicated executor for running CPU-intensive tasks compute_executor: DedicatedExecutor, /// The audit database context audit_database_context: AuditDatabaseContext, } impl TestContext { /// Builds and returns a suitable [`TestContext`] from a global configuration which is ready to /// run tests. /// /// # Implementation Details /// /// This functions wraps over a mutex which ensures that only the first caller will run global /// database creation, migrations, and other preparations. #[allow(clippy::disallowed_methods)] pub async fn global( pg_dbname: &'static str, layer_cache_pg_dbname: &'static str, audit_pg_dbname: &'static str, ) -> Result<Self> { let mut mutex_guard = TEST_CONTEXT_BUILDER.lock().await; match &*mutex_guard { ContextBuilderState::Uninitialized => { let config = Config::create_default(pg_dbname, layer_cache_pg_dbname, audit_pg_dbname) .si_inspect_err(|err| { *mutex_guard = ContextBuilderState::errored(err.to_string()) })?; // We want to connect directly when we migrate, then connect to the pool after let mut migrate_config = config.clone(); if env::var_os("USE_CI_PG_SETUP").is_some() { migrate_config.pg.hostname = "db-test".to_string(); migrate_config.pg.port = 5432; migrate_config.layer_cache_pg_pool.hostname = "db-test".to_string(); migrate_config.layer_cache_pg_pool.port = 5432; migrate_config.audit_pg_pool.hostname = "db-test".to_string(); migrate_config.audit_pg_pool.port = 5432; } else { migrate_config.pg.port = 8432; migrate_config.layer_cache_pg_pool.port = 8432; migrate_config.audit_pg_pool.port = 8432; } let migrate_test_context_builder = TestContextBuilder::create(migrate_config) .await .si_inspect_err(|err| { *mutex_guard = ContextBuilderState::errored(err.to_string()); })?; // The stack gets too deep here, so we'll spawn the work as a task with a new // thread stack just for the global setup let handle = tokio::spawn(global_setup(migrate_test_context_builder)); // Join this task and wait on its completion match handle.await { // Global setup completed successfully Ok(Ok(())) => { debug!("task global_setup was successful"); let test_context_builder = TestContextBuilder::create(config) .await .si_inspect_err(|err| { *mutex_guard = ContextBuilderState::errored(err.to_string()); })?; *mutex_guard = ContextBuilderState::created(test_context_builder.clone()); test_context_builder.build_for_test().await } // Global setup errored Ok(Err(err)) => { *mutex_guard = ContextBuilderState::errored(err.to_string()); Err(err) } // Tokio task panicked or was cancelled Err(err) => { if err.is_panic() { error!(error = %err, "spawned task global_setup panicked!"); } else if err.is_cancelled() { error!(error = %err, "spawned task global_setup was cancelled!"); } *mutex_guard = ContextBuilderState::errored(err.to_string()); Err(err.into()) } } } ContextBuilderState::Created(builder) => builder.build_for_test().await, ContextBuilderState::Errored(message) => { error!(error = %message, "global setup failed, aborting test"); Err(eyre!("global setup failed: {}", message)) } } } /// Creates a new [`ServicesContext`]. #[allow(clippy::expect_used, clippy::panic, clippy::disallowed_methods)] pub async fn create_services_context( &self, token: CancellationToken, tracker: TaskTracker, ) -> ServicesContext { let rebaser = rebaser_client::Client::new(self.nats_conn.clone()) .await .expect("failed to create rebaser client"); let veritech = veritech_client::Client::new(self.nats_conn.clone()); let persister_mode = env::var(ENV_VAR_PERSISTER_MODE) .ok() .and_then(|s| serde_json::from_str(&format!("\"{s}\"")).ok()) .unwrap_or_default(); let layer_db_config = si_layer_cache::db::LayerDbConfig { pg_pool_config: self.config.layer_cache_pg_pool.clone(), nats_config: self.config.nats.clone(), cache_config: CacheConfig::default().disk_layer(false), object_storage_config: self.config.object_storage_config.clone(), persister_mode, }; let (layer_db, layer_db_graceful_shutdown) = DalLayerDb::from_services( layer_db_config, self.layer_db_pg_pool.clone(), self.nats_conn.clone(), self.compute_executor.clone(), token, ) .await .expect("could not create layer db in test context"); tracker.spawn(layer_db_graceful_shutdown.into_future()); ServicesContext::new( self.pg_pool.clone(), self.nats_conn.clone(), self.nats_streams.clone(), self.job_processor.clone(), rebaser, veritech, self.encryption_key.clone(), self.config.pkgs_path.to_owned(), None, self.symmetric_crypto_service.clone(), layer_db, FeatureFlagService::default(), self.compute_executor.clone(), ) } /// Gets a reference to the NATS configuration. pub fn nats_config(&self) -> &NatsConfig { &self.config.nats } /// Gets a reference to the audit database context. pub fn audit_database_context(&self) -> &AuditDatabaseContext { &self.audit_database_context } /// Gets a reference to the NATS client. pub fn nats_conn(&self) -> &NatsClient { &self.nats_conn } } /// A builder for a [`TestContext`]. /// /// Each `TestContext` has an active connection pool to the database and messaging system, and /// rather than share these single pools among all global set and all test tests, a new set of /// dedicated pools can be created as needed. This builder holds all other state other than the /// pool-acquiring steps. #[derive(Clone, Debug)] struct TestContextBuilder { /// The test context configuration used to build this instance. config: Config, /// A key for re-recrypting messages to the function execution system. encryption_key: Arc<VeritechEncryptionKey>, } impl TestContextBuilder { /// Creates a new builder. async fn create(config: Config) -> Result<Self> { let encryption_key = Arc::new( VeritechEncryptionKey::load(&config.veritech_encryption_key_path) .await .wrap_err("failed to load EncryptionKey")?, ); Ok(Self { config, encryption_key, }) } /// Builds and returns a new [`TestContext`] with its own connection pooling for global setup. async fn build_for_global(&self) -> Result<TestContext> { let pg_pool = PgPool::new(&self.config.pg) .await .wrap_err("failed to create global setup PgPool")?; let layer_cache_pg_pool = PgPool::new(&self.config.layer_cache_pg_pool).await?; let audit_pg_pool = PgPool::new(&self.config.audit_pg_pool).await?; self.build_inner(pg_pool, layer_cache_pg_pool, audit_pg_pool) .await } /// Builds and returns a new [`TestContext`] with its own connection pooling for each test. async fn build_for_test(&self) -> Result<TestContext> { let pg_pool = self .create_test_specific_db_with_pg_pool(&self.config.pg) .await?; let layer_cache_pg_pool = self .create_test_specific_db_with_pg_pool(&self.config.layer_cache_pg_pool) .await?; let audit_pg_pool = self .create_test_specific_db_with_pg_pool(&self.config.audit_pg_pool) .await?; self.build_inner(pg_pool, layer_cache_pg_pool, audit_pg_pool) .await } async fn build_inner( &self, pg_pool: PgPool, layer_db_pg_pool: PgPool, audit_pg_pool: PgPool, ) -> Result<TestContext> { let universal_prefix = random_identifier_string(); // Need to make a new NatsConfig so that we can add the test-specific subject prefix // without leaking it to other tests. let mut nats_config = self.config.nats.clone(); nats_config.subject_prefix = Some(universal_prefix.clone()); let mut config = self.config.clone(); config.nats.subject_prefix = Some(universal_prefix.clone()); config.object_storage_config.key_prefix = Some(universal_prefix.clone()); let nats_conn = NatsClient::new(&nats_config) .await .wrap_err("failed to create NatsClient")?; let nats_streams = JetstreamStreams::new(nats_conn.clone()) .await .wrap_err("failed to create NatsStreams")?; let job_processor = Box::new( NatsProcessor::new(nats_conn.clone()) .await .wrap_err("failed to create NatsProcessor")?, ) as Box<dyn JobQueueProcessor + Send + Sync>; let symmetric_crypto_service = SymmetricCryptoService::from_config(&self.config.symmetric_crypto_service_config) .await?; let compute_executor = si_runtime::compute_executor("dal-test")?; let audit_database_context = AuditDatabaseContext::from_pg_pool(audit_pg_pool); Ok(TestContext { config, pg_pool, nats_conn, nats_streams, job_processor, encryption_key: self.encryption_key.clone(), symmetric_crypto_service, layer_db_pg_pool, compute_executor, audit_database_context, }) } async fn create_test_specific_db_with_pg_pool( &self, pg_pool_config: &PgPoolConfig, ) -> Result<PgPool> { // Connect to the 'postgres' database so we can copy our migrated template test database let mut new_pg_pool_config = pg_pool_config.clone(); new_pg_pool_config.dbname = "postgres".to_string(); let new_pg_pool = PgPool::new(&new_pg_pool_config) .await .wrap_err("failed to create PgPool to db 'postgres'")?; let db_conn = new_pg_pool .get() .await .wrap_err("failed to connect to db 'postgres'")?; // Create new database from template let db_name_suffix = random_identifier_string(); let dbname = format!("{}_{}", pg_pool_config.dbname, db_name_suffix); let query = format!( "CREATE DATABASE {dbname} WITH TEMPLATE {} OWNER {};", pg_pool_config.dbname, pg_pool_config.user, ); let db_exists_check = db_conn .query_opt( "SELECT datname FROM pg_database WHERE datname = $1", &[&dbname], ) .await?; if db_exists_check.is_none() { info!(dbname = %dbname, "creating test-specific database"); db_conn .execute(&query, &[]) .instrument(debug_span!("creating test database from template")) .await .wrap_err("failed to create test specific database")?; } else { info!(dbname = %dbname, "test-specific database already exists"); } // This is ugly, but we pretty much always want to know which test DB is used for // any given test when it fails, and the logging/tracing macros aren't captured // (or displayed) during tests, while `println!(...)` will be captured the same as // "normal" test output, meaning it respects --nocapture and being displayed for // failing tests. info!("Test database: {}", &dbname); // Return new PG pool that uess the new datatbase new_pg_pool_config.dbname = dbname; PgPool::new(&new_pg_pool_config) .await .wrap_err("failed to create PgPool to db 'postgres'") } } /// Generates a new pseudo-random NATS subject prefix. pub fn random_identifier_string() -> String { Uuid::new_v4().as_simple().to_string() } /// Returns a JWT public signing key, which is used to verify claims. pub async fn jwt_public_signing_key() -> Result<JwtPublicSigningKeyChain> { let jwt_config = { let context_builder = TEST_CONTEXT_BUILDER.lock().await; let config = context_builder.config()?; let key_file = Some(CanonicalFile::from_str( &config.jwt_signing_public_key_path, )?); JwtConfig { key_file, key_base64: None, algo: config.jwt_signing_public_key_algo, } }; let key = JwtPublicSigningKeyChain::from_config(jwt_config, None).await?; Ok(key) } /// Returns a JWT private signing key, which is used to sign claims. #[allow(clippy::expect_used, clippy::panic)] pub async fn jwt_private_signing_key() -> Result<RS256KeyPair> { let key_path = { let context_builder = TEST_CONTEXT_BUILDER.lock().await; let config = context_builder.config()?; config.jwt_signing_private_key_path.clone() }; let key_str = { let mut file = File::open(key_path) .await .wrap_err("failed to open RSA256 key file")?; let mut buf = String::new(); file.read_to_string(&mut buf) .await .wrap_err("failed to read from RSA256 file")?; buf }; let key_pair = RS256KeyPair::from_pem(&key_str).expect("failed to parse RSA256 from pem file"); Ok(key_pair) } /// Configures and builds a [`pinga_server::Server`] suitable for running alongside DAL /// object-related tests. pub async fn pinga_server( services_context: ServicesContext, shutdown_token: CancellationToken, ) -> Result<pinga_server::Server> { let config: pinga_server::Config = { let mut config_file = pinga_server::ConfigFile::default(); pinga_server::detect_and_configure_development(&mut config_file) .wrap_err("failed to detect and configure Pinga ConfigFile")?; config_file .try_into() .wrap_err("failed to build Pinga server config")? }; let server = pinga_server::Server::from_services( config.instance_id(), config.concurrency_limit(), config.max_deliver(), services_context, shutdown_token, ) .await .wrap_err("failed to create Pinga server")?; Ok(server) } /// Configures and builds an [`edda_server::Server`] suitable for running alongside DAL /// object-related tests. pub async fn edda_server( services_context: ServicesContext, shutdown_token: CancellationToken, ) -> Result<edda_server::Server> { let config: edda_server::Config = { let mut config_file = edda_server::ConfigFile::default(); edda_server::detect_and_configure_development(&mut config_file) .wrap_err("failed to detect and configure Edda ConfigFile")?; config_file .try_into() .wrap_err("failed to build Edda server config")? }; let frigg_nats = services_context.nats_conn().clone(); let server = edda_server::Server::from_services( config.instance_id(), config.concurrency_limit(), config.parallel_build_limit(), config.streaming_patches(), services_context, frigg_nats, config.quiescent_period(), shutdown_token, ) .await .wrap_err("failed to create Edda server")?; Ok(server) } /// Configures and builds a [`rebaser_server::Server`] suitable for running alongside DAL /// object-related tests. pub async fn rebaser_server( services_context: ServicesContext, shutdown_token: CancellationToken, ) -> Result<rebaser_server::Server> { let config: rebaser_server::Config = rebaser_server::ConfigFile::default() .try_into() .wrap_err("failed to build Rebaser server config")?; let server = rebaser_server::Server::from_services( config.instance_id(), None, services_context, config.quiescent_period(), shutdown_token, config.features(), ) .await .wrap_err("failed to create Rebaser server")?; Ok(server) } /// Configures and builds a [`veritech_server::Server`] suitable for running alongside DAL /// object-related tests. pub async fn veritech_server_for_uds_cyclone( nats_config: NatsConfig, token: CancellationToken, ) -> Result<veritech_server::Server> { let config: veritech_server::Config = { let mut config_file = veritech_server::ConfigFile::default_local_uds(); config_file.nats = nats_config; config_file.cyclone.set_pool_size(4); config_file.heartbeat_app = false; veritech_server::detect_and_configure_development(&mut config_file) .wrap_err("failed to detect and configure Veritech ConfigFile")?; config_file .try_into() .wrap_err("failed to build Veritech server config")? }; let (server, _disabled_heartbeat_app) = veritech_server::Server::from_config(config, token) .await .wrap_err("failed to create Veritech server")?; Ok(server) } /// Configures and builds a [`forklift_server::Server`] suitable for running alongside DAL /// object-related tests. pub async fn forklift_server( nats: NatsClient, audit_database_context: AuditDatabaseContext, token: CancellationToken, ) -> Result<forklift_server::Server> { let config: forklift_server::Config = forklift_server::ConfigFile::default() .try_into() .wrap_err("failed to build forklift server config")?; let connection_metadata = Arc::new(nats.metadata().to_owned()); let jetstream_context = jetstream::new(nats.clone()); // Initialize pools for eviction task (using same pattern as forklift from_config) let si_db_pool = si_data_pg::PgPool::new(&config.snapshot_eviction().si_db) .await .wrap_err("failed to create si-db pool for eviction")?; let layer_cache_pool = si_data_pg::PgPool::new(&config.snapshot_eviction().layer_cache_pg) .await .wrap_err("failed to create layer-cache pool for eviction")?; // Create LayeredEventClient (following forklift pattern) let instance_id_ulid = ulid::Ulid::from_string(config.instance_id()) .wrap_err("failed to parse instance_id as ULID")?; let layered_event_client = si_layer_cache::event::LayeredEventClient::new( nats.metadata().subject_prefix().map(|s| s.to_owned()), instance_id_ulid, jetstream_context.clone(), ); // Validate and clamp eviction config let mut eviction_config = config.snapshot_eviction().clone(); eviction_config.validate_and_clamp(); let server = forklift_server::Server::from_services( connection_metadata, jetstream_context, config.instance_id(), config.concurrency_limit(), Some(( audit_database_context, config.audit().insert_concurrency_limit, )), None, si_db_pool, layer_cache_pool, layered_event_client, eviction_config, token, ) .await .wrap_err("failed to create forklift server")?; Ok(server) } async fn global_setup(test_context_builer: TestContextBuilder) -> Result<()> { info!("running global test setup"); let test_context = test_context_builer.build_for_global().await?; // We need to be the only person connected to the real database. This drops all connections // that aren't this one from the database. This disconnects the PgBouncers, any client // terminals, and anyone else - ensuring we always get the global template to ourselves. // // PG is the best. // // Since we are connected to the same server, we only need to run this on one pool. let conn = test_context.pg_pool.get().await?; conn.query( "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid();", &[], ) .await?; debug!("initializing crypto"); sodiumoxide::init().map_err(|_| eyre!("failed to init sodiumoxide crypto"))?; let token = CancellationToken::new(); let tracker = TaskTracker::new(); // Create a `ServicesContext` let services_ctx = test_context .create_services_context(token.clone(), tracker.clone()) .await; info!("creating client with pg pool for global Content Store test database"); info!("testing database connection"); services_ctx .pg_pool() .test_connection() .await .wrap_err("failed to connect to database, is it running and available?")?; #[allow(clippy::disallowed_methods)] // Environment variables are used exclusively in test and // all are prefixed with `SI_TEST_` if !env::var(ENV_VAR_KEEP_OLD_DBS).is_ok_and(|v| !v.is_empty()) { info!("dropping old test-specific databases for dal"); drop_old_test_databases(services_ctx.pg_pool()) .await .wrap_err("failed to drop old databases")?; info!("dropping old test-specific layerdb databases"); drop_old_test_databases(services_ctx.layer_db().pg_pool()) .await .wrap_err("failed to drop old test-specific content store databases")?; } // Ensure the database is totally clean, then run all migrations info!("dropping and re-creating the database schema"); services_ctx .pg_pool() .drop_and_create_public_schema() .await .wrap_err("failed to drop and create the database")?; services_ctx .layer_db() .pg_pool() .drop_and_create_public_schema() .await .wrap_err("failed to drop and create layer db database")?; info!("running database migrations"); { si_db::migrate::migrate(services_ctx.pg_pool()) .await .wrap_err("failed to migrate database")?; services_ctx .layer_db() .pg_migrate() .await .wrap_err("failed to migrate layerdb")?; audit_database::migrate(&test_context.audit_database_context) .await .wrap_err("failed to migrate audit database")?; } // Startup up a Forklift server exclusively for migrations info!("starting Forklift server for initial migrations"); let forklift_server = forklift_server( test_context.nats_conn.to_owned(), test_context.audit_database_context.to_owned(), token.clone(), ) .await?; tracker.spawn(forklift_server.run()); // Start up a Pinga server as a task exclusively to allow the migrations to run info!("starting Pinga server for initial migrations"); let srv_services_ctx = test_context .create_services_context(token.clone(), tracker.clone()) .await; let pinga_server = pinga_server(srv_services_ctx, token.clone()).await?; tracker.spawn(pinga_server.run()); // Start up an Edda server as a task exclusively to allow the migrations to run info!("starting Edda server for initial migrations"); let srv_services_ctx = test_context .create_services_context(token.clone(), tracker.clone()) .await; let edda_server = edda_server(srv_services_ctx, token.clone()).await?; tracker.spawn(edda_server.run()); // Start up a Rebaser server for migrations info!("starting Rebaser server for initial migrations"); let srv_services_ctx = test_context .create_services_context(token.clone(), tracker.clone()) .await; let rebaser_server = rebaser_server(srv_services_ctx, token.clone()).await?; tracker.spawn(rebaser_server.run()); // Start up a Veritech server as a task exclusively to allow the migrations to run info!("starting Veritech server for initial migrations"); let veritech_server = veritech_server_for_uds_cyclone(test_context.config.nats.clone(), token.clone()).await?; tracker.spawn(veritech_server.run()); tracker.close(); #[allow(clippy::expect_used)] let pkgs_path = test_context .config .pkgs_path .to_owned() .expect("no pkgs path configured"); info!("creating builtins"); migrate_local_builtins( services_ctx.pg_pool(), services_ctx.nats_conn(), services_ctx.jetstream_streams(), services_ctx.job_processor(), services_ctx.rebaser().clone(), services_ctx.veritech().clone(), &services_ctx.encryption_key(), pkgs_path, test_context.config.module_index_url.clone(), services_ctx.symmetric_crypto_service(), services_ctx.layer_db().clone(), services_ctx.feature_flags_service().clone(), services_ctx.compute_executor().clone(), ) .await .wrap_err("failed to run builtin migrations")?; // Cancel and wait for all outstanding tasks to complete info!("shutting down dependent services"); token.cancel(); tracker.wait().await; info!("global test setup complete"); Ok(()) } #[allow(clippy::too_many_arguments)] #[instrument(level = "info", skip_all)] async fn migrate_local_builtins( dal_pg: &PgPool, nats: &NatsClient, nats_streams: &JetstreamStreams, job_processor: Box<dyn JobQueueProcessor + Send + Sync>, rebaser: rebaser_client::Client, veritech: veritech_client::Client, encryption_key: &VeritechEncryptionKey, pkgs_path: PathBuf, module_index_url: String, symmetric_crypto_service: &SymmetricCryptoService, layer_db: DalLayerDb, feature_flag_service: FeatureFlagService, compute_executor: DedicatedExecutor, ) -> Result<()> { let services_context = ServicesContext::new( dal_pg.clone(), nats.clone(), nats_streams.clone(), job_processor, rebaser, veritech, Arc::new(*encryption_key), Some(pkgs_path), Some(module_index_url), symmetric_crypto_service.clone(), layer_db.clone(), feature_flag_service, compute_executor, ); let dal_context = services_context.into_builder(true); let mut ctx = dal_context.build_default(None).await?; info!("setup builtin workspace"); Workspace::setup_builtin(&mut ctx).await?; info!("migrating intrinsic functions"); func::migrate_intrinsics_for_tests(&ctx).await?; info!("migrating test exclusive schemas"); test_exclusive_schemas::migrate(&ctx).await?; info!("migrations complete, commiting"); ctx.blocking_commit().await?; Ok(()) } async fn drop_old_test_databases(pg_pool: &PgPool) -> Result<()> { let name_prefix = format!("{}_%", pg_pool.db_name()); let pg_conn = pg_pool.get().await?; let rows = pg_conn .query( "SELECT datname FROM pg_database WHERE datname LIKE $1", &[&name_prefix.as_str()], ) .await?; for row in rows { let dbname: String = row.try_get("datname")?; debug!(db_name = %dbname, "dropping database"); pg_conn .execute(&format!("DROP DATABASE IF EXISTS {dbname}"), &[]) .await?; } Ok(()) } #[allow(clippy::disallowed_methods)] // Used to determine if running in testing fn detect_and_configure_testing(builder: &mut ConfigBuilder) -> Result<()> { if env::var("BUCK_RUN_BUILD_ID").is_ok() || env::var("BUCK_BUILD_ID").is_ok() { detect_and_configure_testing_for_buck2(builder) } else if let Ok(dir) = env::var("CARGO_MANIFEST_DIR") { detect_and_configure_testing_for_cargo(dir, builder) } else { unimplemented!("tests must be run either with Cargo or Buck2"); } } fn detect_and_configure_testing_for_buck2(builder: &mut ConfigBuilder) -> Result<()> { let resources = Buck2Resources::read()?; let veritech_encryption_key_path = resources .get_ends_with("dev.encryption.key")? .to_string_lossy() .to_string(); let jwt_signing_public_key_path = resources .get_ends_with("dev.jwt_signing_public_key.pem")? .to_string_lossy() .to_string(); let jwt_signing_private_key_path = resources .get_ends_with("dev.jwt_signing_private_key.pem")? .to_string_lossy() .to_string(); let symmetric_crypto_service_key = resources .get_ends_with("dev.donkey.key")? .to_string_lossy() .to_string(); let postgres_key = resources .get_ends_with("dev.postgres.root.crt")? .to_string_lossy() .to_string(); let pkgs_path = resources .get_ends_with("pkgs_path")? .to_string_lossy() .to_string(); warn!( veritech_encryption_key_path = veritech_encryption_key_path.as_str(), jwt_signing_private_key_path = jwt_signing_private_key_path.as_str(), jwt_signing_public_key_path = jwt_signing_public_key_path.as_str(), symmetric_crypto_service_key = symmetric_crypto_service_key.as_str(), postgres_key = postgres_key.as_str(), pkgs_path = pkgs_path.as_str(), "detected development run", ); builder.veritech_encryption_key_path(veritech_encryption_key_path); builder.jwt_signing_public_key_path(jwt_signing_public_key_path); builder.jwt_signing_private_key_path(jwt_signing_private_key_path); builder.symmetric_crypto_service_config( SymmetricCryptoServiceConfigFile { active_key: Some(symmetric_crypto_service_key), active_key_base64: None, extra_keys: vec![], } .try_into()?, ); builder.postgres_key_path(postgres_key); builder.pkgs_path(Some(pkgs_path.into())); Ok(()) } fn detect_and_configure_testing_for_cargo(dir: String, builder: &mut ConfigBuilder) -> Result<()> { let veritech_encryption_key_path = Path::new(&dir) .join("../../lib/veritech-server/src/dev.encryption.key") .to_string_lossy() .to_string(); let jwt_signing_public_key_path = Path::new(&dir) .join("../../config/keys/dev.jwt_signing_public_key.pem") .to_string_lossy() .to_string(); let jwt_signing_private_key_path = Path::new(&dir) .join("../../config/keys/dev.jwt_signing_private_key.pem") .to_string_lossy() .to_string(); let symmetric_crypto_service_key = Path::new(&dir) .join("../../lib/dal/dev.donkey.key") .to_string_lossy() .to_string(); let postgres_key = Path::new(&dir) .join("../../config/keys/dev.postgres.root.crt") .to_string_lossy() .to_string(); let pkgs_path = Path::new(&dir) .join("../../pkgs") .to_string_lossy() .to_string(); warn!( veritech_encryption_key_path = veritech_encryption_key_path.as_str(), jwt_signing_private_key_path = jwt_signing_private_key_path.as_str(), jwt_signing_public_key_path = jwt_signing_public_key_path.as_str(), symmetric_crypto_service_key = symmetric_crypto_service_key.as_str(), postgres_key = postgres_key.as_str(), pkgs_path = pkgs_path.as_str(), "detected development run", ); builder.veritech_encryption_key_path(veritech_encryption_key_path); builder.jwt_signing_public_key_path(jwt_signing_public_key_path); builder.jwt_signing_private_key_path(jwt_signing_private_key_path); builder.symmetric_crypto_service_config( SymmetricCryptoServiceConfigFile { active_key: Some(symmetric_crypto_service_key), active_key_base64: None, extra_keys: vec![], } .try_into()?, ); builder.postgres_key_path(postgres_key); builder.pkgs_path(Some(pkgs_path.into())); Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server