Skip to main content
Glama
init.rs7.66 kB
use std::{ path::PathBuf, sync::Arc, }; use dal::{ DalLayerDb, DedicatedExecutor, JetstreamStreams, JobQueueProcessor, NatsProcessor, ServicesContext, feature_flags::FeatureFlagService, }; use rebaser_client::RebaserClient; use si_crypto::{ SymmetricCryptoService, SymmetricCryptoServiceConfig, VeritechCryptoConfig, VeritechEncryptionKey, }; use si_data_nats::{ NatsClient, NatsConfig, }; use si_data_pg::{ PgPool, PgPoolConfig, }; use si_jwt_public_key::{ JwtConfig, JwtPublicSigningKeyChain, JwtPublicSigningKeyError, }; use si_layer_cache::{ LayerDb, db::{ LayerDbConfig, LayerDbGracefulShutdown, }, }; use si_posthog::{ PosthogClient, PosthogConfig, PosthogSender, }; use telemetry::prelude::*; use thiserror::Error; use tokio_util::sync::CancellationToken; use crate::Config; #[remain::sorted] #[derive(Debug, Error)] pub enum InitError { #[error("compute executor initialization error: {0}")] ComputeExecutor(#[from] dal::DedicatedExecutorInitializeError), #[error("initialization error: {0}")] DalInitialization(#[from] dal::InitializationError), #[error("failed to initialize a dal jetstream streams: {0}")] DalJetstreamStreams(#[source] dal::JetstreamStreamsError), #[error("job queue processor error: {0}")] DalJobQueueProcessor(#[from] dal::job::processor::JobQueueProcessorError), #[error("jwt key error")] JwtKey(#[from] JwtPublicSigningKeyError), #[error("layer cache error: {0}")] LayerCache(#[from] si_layer_cache::LayerDbError), #[error("failed to initialize a nats client: {0}")] NatsClient(#[source] si_data_nats::NatsError), #[error("pg pool error: {0}")] PgPool(#[from] Box<si_data_pg::PgPoolError>), #[error("posthog client error: {0}")] Posthog(#[from] si_posthog::PosthogError), #[error("rebaser client error: {0}")] Rebaser(#[from] rebaser_client::ClientError), #[error("symmetric crypto error: {0}")] SymmetricCryptoService(#[from] si_crypto::SymmetricCryptoError), #[error("error when loading cyclone encryption key: {0}")] VeritechEncryptionKey(#[from] si_crypto::VeritechEncryptionKeyError), } impl From<si_data_pg::PgPoolError> for InitError { fn from(e: si_data_pg::PgPoolError) -> Self { Self::PgPool(Box::new(e)) } } type InitResult<T> = std::result::Result<T, InitError>; pub(crate) async fn services_context_from_config( config: &Config, helping_tasks_token: CancellationToken, ) -> InitResult<(ServicesContext, LayerDbGracefulShutdown)> { dal::init()?; let encryption_key = load_encryption_key(config.crypto().clone()).await?; let nats = connect_to_nats(config.nats()).await?; let jetstream_streams = get_or_create_jetstream_streams(nats.clone()).await?; let pg_pool = create_pg_pool(config.pg_pool()).await?; let rebaser = create_rebaser_client(nats.clone()).await?; let veritech = create_veritech_client(nats.clone()); let job_processor = create_job_processor(nats.clone()).await?; let symmetric_crypto_service = create_symmetric_crypto_service(config.symmetric_crypto_service()).await?; let pkgs_path: PathBuf = config.pkgs_path().into(); let module_index_url = Some(config.module_index_url().to_string()); let feature_flags_service = FeatureFlagService::new(config.boot_feature_flags().clone()); let compute_executor = create_compute_executor()?; let (layer_db, layer_db_graceful_shutdown) = initialize_layer_db( config.layer_db_config().clone(), compute_executor.clone(), helping_tasks_token.clone(), ) .await?; let services_context = ServicesContext::new( pg_pool, nats.clone(), jetstream_streams, job_processor, rebaser, veritech, encryption_key, Some(pkgs_path), module_index_url, symmetric_crypto_service, layer_db, feature_flags_service, compute_executor, ); Ok((services_context, layer_db_graceful_shutdown)) } #[instrument(name = "sdf.init.load_encryption_key", level = "info", skip_all)] pub(crate) async fn load_encryption_key( crypto_config: VeritechCryptoConfig, ) -> InitResult<Arc<VeritechEncryptionKey>> { Ok(Arc::new( VeritechEncryptionKey::from_config(crypto_config).await?, )) } #[instrument(name = "sdf.init.connect_to_nats", level = "info", skip_all)] pub(crate) async fn connect_to_nats(nats_config: &NatsConfig) -> InitResult<NatsClient> { let client = NatsClient::new(nats_config) .await .map_err(InitError::NatsClient)?; debug!("successfully connected nats client"); Ok(client) } #[instrument( name = "sdf.init.get_or_create_jetstream_streams", level = "info", skip_all )] pub(crate) async fn get_or_create_jetstream_streams( client: NatsClient, ) -> InitResult<JetstreamStreams> { let streams = JetstreamStreams::new(client) .await .map_err(InitError::DalJetstreamStreams)?; debug!("created jetstream streams"); Ok(streams) } #[instrument(name = "sdf.init.create_pg_pool", level = "info", skip_all)] pub(crate) async fn create_pg_pool(pg_pool_config: &PgPoolConfig) -> InitResult<PgPool> { let pool = PgPool::new(pg_pool_config).await?; debug!("successfully started pg pool (note that not all connections may be healthy)"); Ok(pool) } #[instrument(name = "sdf.init.create_rebaser_client", level = "info", skip_all)] async fn create_rebaser_client(nats: NatsClient) -> InitResult<RebaserClient> { let client = RebaserClient::new(nats).await?; debug!("successfully initialized the rebaser client"); Ok(client) } pub(crate) fn create_veritech_client(nats: NatsClient) -> veritech_client::Client { veritech_client::Client::new(nats) } #[instrument(name = "sdf.init.create_compute_executor", level = "info", skip_all)] pub(crate) fn create_compute_executor() -> InitResult<DedicatedExecutor> { dal::compute_executor("sdf").map_err(Into::into) } #[instrument(name = "sdf.init.create_job_processor", level = "info", skip_all)] pub(crate) async fn create_job_processor( nats: NatsClient, ) -> InitResult<Box<dyn JobQueueProcessor + Send + Sync>> { Ok(Box::new(NatsProcessor::new(nats).await?) as Box<dyn JobQueueProcessor + Send + Sync>) } #[instrument( name = "sdf.init.create_symmetric_crypto_service", level = "info", skip_all )] pub(crate) async fn create_symmetric_crypto_service( config: &SymmetricCryptoServiceConfig, ) -> InitResult<SymmetricCryptoService> { SymmetricCryptoService::from_config(config) .await .map_err(Into::into) } #[instrument(name = "sdf.init.initialize_layer_db", level = "info", skip_all)] pub(crate) async fn initialize_layer_db( config: LayerDbConfig, compute_executor: DedicatedExecutor, token: CancellationToken, ) -> InitResult<(DalLayerDb, LayerDbGracefulShutdown)> { LayerDb::from_config(config, compute_executor, token) .await .map_err(Into::into) } #[instrument( name = "sdf.init.load_jwt_public_signing_key", level = "info", skip_all )] pub(crate) async fn load_jwt_public_signing_key( primary: JwtConfig, secondary: Option<JwtConfig>, ) -> InitResult<JwtPublicSigningKeyChain> { Ok(JwtPublicSigningKeyChain::from_config(primary, secondary).await?) } pub(crate) fn initialize_posthog( config: &PosthogConfig, token: CancellationToken, ) -> InitResult<(PosthogSender, PosthogClient)> { si_posthog::from_config(config, token).map_err(Into::into) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server