Skip to main content
Glama
server.rs14.4 kB
use core::fmt; use std::{ future::{ Future, IntoFuture, }, io, sync::Arc, time::Duration, }; use dal::{ DalContext, DalLayerDb, DedicatedExecutor, JetstreamStreams, JobQueueProcessor, NatsProcessor, ServicesContext, feature_flags::FeatureFlagService, }; use edda_client::EddaClient; use nats_dead_letter_queue::DeadLetterQueue; use naxum::{ Message, MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, extract::MatchedSubject, handler::Handler as _, middleware::{ ack::AckLayer, matched_subject::{ ForSubject, MatchedSubjectLayer, }, trace::{ OnRequest, TraceLayer, }, }, response::{ IntoResponse, Response, }, }; use pinga_client::PingaClient; use rebaser_client::RebaserClient; use rebaser_core::nats; use si_crypto::{ SymmetricCryptoService, SymmetricCryptoServiceConfig, VeritechCryptoConfig, VeritechEncryptionKey, }; use si_data_nats::{ NatsClient, NatsConfig, async_nats, jetstream, }; use si_data_pg::{ PgPool, PgPoolConfig, }; use telemetry::prelude::*; use telemetry_utils::metric; use tokio_util::{ sync::CancellationToken, task::TaskTracker, }; use veritech_client::Client as VeritechClient; use crate::{ Config, Error, Features, Result, app_state::AppState, handlers, }; const TASKS_CONSUMER_NAME: &str = "rebaser-tasks"; // Send ack pending messages more often than the default 20 seconds const TASKS_PROGRESS_PERIOD: Duration = Duration::from_secs(10); // Increase the default ack wait from 30 to 60 seconds const TASKS_ACK_WAIT: Duration = Duration::from_secs(60); /// Server metadata, used with telemetry. #[derive(Clone, Debug)] pub struct ServerMetadata { instance_id: String, } impl ServerMetadata { /// Returns the server's unique instance id. pub fn instance_id(&self) -> &str { &self.instance_id } } /// A service which concurrently processes rebaser requests across multiple change sets. /// /// Each change set is processed with a dedicated micro service (a [`ChangeSetRequestsTask`]) which /// consumes a work queue of rebaser requests. The work queue semantics allows multiple Rebaser /// server instances to run and share work while collectively processing one message at a time, /// thus preserving the total order of requests within the scope of a chanage set. /// /// An activity stream of all rebaser requests is processed in the main loop to determine if new /// change set tasks should be spawned. pub struct Server { metadata: Arc<ServerMetadata>, inner: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>, server_tracker: TaskTracker, } impl fmt::Debug for Server { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Server") .field("metadata", &self.metadata) .finish() } } impl Server { /// Creates a runnable [`Server`] from configuration. #[instrument(name = "rebaser.init.from_config", level = "info", skip_all)] pub async fn from_config( config: Config, shutdown_token: CancellationToken, layer_db_tracker: &TaskTracker, layer_db_token: CancellationToken, ) -> Result<Self> { dal::init()?; let encryption_key = Self::load_encryption_key(config.crypto().clone()).await?; let nats = Self::connect_to_nats(config.nats()).await?; let jetstream_streams = JetstreamStreams::new(nats.clone()).await?; let pg_pool = Self::create_pg_pool(config.pg_pool()).await?; let rebaser = Self::create_rebaser_client(nats.clone()).await?; let veritech = Self::create_veritech_client(nats.clone()); let job_processor = Self::create_job_processor(nats.clone()).await?; let symmetric_crypto_service = Self::create_symmetric_crypto_service(config.symmetric_crypto_service()).await?; let compute_executor = Self::create_compute_executor()?; let (layer_db, layer_db_graceful_shutdown) = DalLayerDb::from_config( config.layer_db_config().clone(), compute_executor.clone(), layer_db_token.clone(), ) .await?; layer_db_tracker.spawn(layer_db_graceful_shutdown.into_future()); let services_context = ServicesContext::new( pg_pool, nats.clone(), jetstream_streams, job_processor, rebaser, veritech.clone(), encryption_key, None, None, symmetric_crypto_service, layer_db, FeatureFlagService::default(), compute_executor, ); Self::from_services( config.instance_id().to_string(), config.concurrency_limit(), services_context, config.quiescent_period(), shutdown_token, config.features(), ) .await } /// Creates a runnable [`Server`] from pre-configured and pre-created services. #[instrument(name = "rebaser.init.from_services", level = "info", skip_all)] pub async fn from_services( instance_id: impl Into<String>, concurrency_limit: Option<usize>, services_context: ServicesContext, quiescent_period: Duration, shutdown_token: CancellationToken, features: Features, ) -> Result<Self> { let metadata = Arc::new(ServerMetadata { instance_id: instance_id.into(), }); let nats = services_context.nats_conn().clone(); let context = jetstream::new(nats.clone()); let prefix = nats.metadata().subject_prefix().map(|s| s.to_owned()); let connection_metadata = nats.metadata_clone(); let tasks = nats::rebaser_tasks_jetstream_stream(&context) .await? .create_consumer(Self::rebaser_tasks_consumer_config(prefix.as_deref())) .await? .messages() .await?; let requests_stream = nats::rebaser_requests_jetstream_stream(&context).await?; let dead_letter_queue = DeadLetterQueue::create_stream(context.clone()).await?; let pinga = PingaClient::new(nats.clone()).await?; let edda = EddaClient::new(nats.clone()).await?; let ctx_builder = DalContext::builder(services_context, false); let server_tracker = TaskTracker::new(); let state = AppState::new( metadata.clone(), nats, pinga, edda, requests_stream, dead_letter_queue, ctx_builder, quiescent_period, shutdown_token.clone(), server_tracker.clone(), features, ); let app = ServiceBuilder::new() .layer( MatchedSubjectLayer::new() .for_subject(RebaserTasksForSubject::with_prefix(prefix.as_deref())), ) .layer( TraceLayer::new() .make_span_with( telemetry_nats::NatsMakeSpan::builder(connection_metadata).build(), ) .on_request(RebaserOnRequest) .on_response(telemetry_nats::NatsOnResponse::new()), ) .layer(AckLayer::new().progress_period(TASKS_PROGRESS_PERIOD)) .service(handlers::default.with_state(state)) .map_response(Response::into_response); let inner = match concurrency_limit { Some(concurrency_limit) => Box::new( naxum::serve_with_incoming_limit(tasks, app.into_make_service(), concurrency_limit) .with_graceful_shutdown(naxum::wait_on_cancelled(shutdown_token.clone())) .into_future(), ), None => Box::new( naxum::serve(tasks, app.into_make_service()) .with_graceful_shutdown(naxum::wait_on_cancelled(shutdown_token.clone())) .into_future(), ), }; Ok(Self { metadata, inner, server_tracker, }) } /// Runs the service to completion or until the first internal error is encountered. #[inline] pub async fn run(self) { if let Err(err) = self.try_run().await { error!(error = ?err, "error while running rebaser main loop"); } } /// Runs the service to completion, returning its result (i.e. whether it successful or an /// internal error was encountered). pub async fn try_run(self) -> Result<()> { self.inner.await.map_err(Error::Naxum)?; info!("rebaser inner loop exited, now shutting down the server tracker's tasks"); self.server_tracker.close(); self.server_tracker.wait().await; info!("rebaser main loop shutdown complete"); Ok(()) } #[inline] fn rebaser_tasks_consumer_config( subject_prefix: Option<&str>, ) -> async_nats::jetstream::consumer::pull::Config { async_nats::jetstream::consumer::pull::Config { ack_wait: TASKS_ACK_WAIT, durable_name: Some(TASKS_CONSUMER_NAME.to_owned()), filter_subject: nats::subject::tasks_incoming(subject_prefix).to_string(), ..Default::default() } } #[instrument(name = "rebaser.init.load_encryption_key", level = "info", skip_all)] async fn load_encryption_key( crypto_config: VeritechCryptoConfig, ) -> Result<Arc<VeritechEncryptionKey>> { Ok(Arc::new( VeritechEncryptionKey::from_config(crypto_config) .await .map_err(Error::CycloneEncryptionKey)?, )) } #[instrument(name = "rebaser.init.connect_to_nats", level = "info", skip_all)] async fn connect_to_nats(nats_config: &NatsConfig) -> Result<NatsClient> { let client = NatsClient::new(nats_config).await?; debug!("successfully connected nats client"); Ok(client) } #[instrument(name = "rebaser.init.create_pg_pool", level = "info", skip_all)] async fn create_pg_pool(pg_pool_config: &PgPoolConfig) -> Result<PgPool> { let pool = PgPool::new(pg_pool_config) .await .map_err(Error::dal_pg_pool)?; debug!("successfully started pg pool (note that not all connections may be healthy)"); Ok(pool) } #[instrument(name = "rebaser.init.create_rebaser_client", level = "info", skip_all)] async fn create_rebaser_client(nats: NatsClient) -> Result<RebaserClient> { let client = RebaserClient::new(nats).await?; debug!("successfully initialized the rebaser client"); Ok(client) } #[instrument(name = "rebaser.init.create_veritech_client", level = "info", skip_all)] fn create_veritech_client(nats: NatsClient) -> VeritechClient { VeritechClient::new(nats) } #[instrument(name = "rebaser.init.create_job_processor", level = "info", skip_all)] async fn create_job_processor( nats: NatsClient, ) -> Result<Box<dyn JobQueueProcessor + Send + Sync>> { Ok(Box::new(NatsProcessor::new(nats).await?) as Box<dyn JobQueueProcessor + Send + Sync>) } #[instrument( name = "rebaser.init.create_symmetric_crypto_service", level = "info", skip_all )] async fn create_symmetric_crypto_service( config: &SymmetricCryptoServiceConfig, ) -> Result<SymmetricCryptoService> { SymmetricCryptoService::from_config(config) .await .map_err(Into::into) } #[instrument( name = "rebaser.init.create_compute_executor", level = "info", skip_all )] fn create_compute_executor() -> Result<DedicatedExecutor> { dal::compute_executor("rebaser").map_err(Into::into) } } #[derive(Clone, Debug)] struct RebaserOnRequest; impl<R> OnRequest<R> for RebaserOnRequest where R: MessageHead, { fn on_request(&mut self, req: &Message<R>, _span: &Span) { debug!(task = req.subject().as_str(), "starting task"); metric!(counter.change_set_processor_task.change_set_task = 1); } } #[derive(Clone, Debug)] struct RebaserTasksForSubject { prefix: Option<()>, } impl RebaserTasksForSubject { fn with_prefix(prefix: Option<&str>) -> Self { Self { prefix: prefix.map(|_p| ()), } } } impl<R> ForSubject<R> for RebaserTasksForSubject where R: MessageHead, { fn call(&mut self, req: &mut naxum::Message<R>) { let mut parts = req.subject().split('.'); match self.prefix { Some(_) => { if let ( Some(prefix), Some(p1), Some(p2), Some(_workspace_id), Some(_change_set_id), Some(kind), None, ) = ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { let matched = format!("{prefix}.{p1}.{p2}.:workspace_id.:change_set_id.{kind}"); req.extensions_mut().insert(MatchedSubject::from(matched)); }; } None => { if let ( Some(p1), Some(p2), Some(_workspace_id), Some(_change_set_id), Some(kind), None, ) = ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { let matched = format!("{p1}.{p2}.:workspace_id.:change_set_id.{kind}"); req.extensions_mut().insert(MatchedSubject::from(matched)); }; } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server