Skip to main content
Glama
change_set_processor_task.rs27.4 kB
use std::{ future::{ Future, IntoFuture, }, io, result, sync::Arc, time::{ Duration, Instant, }, }; use dal::DalContextBuilder; use frigg::FriggStore; use futures::TryStreamExt; use naxum::{ MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, extract::MatchedSubject, handler::Handler as _, middleware::{ matched_subject::{ ForSubject, MatchedSubjectLayer, }, post_process::PostProcessLayer, trace::TraceLayer, }, response::{ IntoResponse, Response, }, }; use si_data_nats::{ NatsClient, async_nats::jetstream::consumer::push, }; use si_events::{ ChangeSetId, WorkspacePk, }; use telemetry::prelude::*; use telemetry_utils::metric; use thiserror::Error; use tokio::{ sync::{ Notify, watch, }, time, }; use tokio_stream::StreamExt as _; use tokio_util::{ sync::CancellationToken, task::TaskTracker, }; use self::app_state::AppState; use crate::{ ServerMetadata, api_types::change_set_request::{ ChangeSetRequest, CompressedChangeSetRequest, }, compressing_stream::CompressingStream, updates::EddaUpdates, }; #[remain::sorted] #[derive(Debug, Error)] pub(crate) enum ChangeSetProcessorTaskError { /// When a naxum-based service encounters an I/O error #[error("naxum error: {0}")] Naxum(#[source] std::io::Error), } type Error = ChangeSetProcessorTaskError; type Result<T> = result::Result<T, ChangeSetProcessorTaskError>; pub(crate) struct ChangeSetProcessorTask { _metadata: Arc<ServerMetadata>, workspace_id: WorkspacePk, change_set_id: ChangeSetId, inner: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>, } impl ChangeSetProcessorTask { const NAME: &'static str = "edda_server::change_set_processor_task"; #[allow(clippy::too_many_arguments)] pub(crate) fn create( metadata: Arc<ServerMetadata>, nats: NatsClient, incoming: CompressingStream<push::Ordered, ChangeSetRequest, CompressedChangeSetRequest>, frigg: FriggStore, edda_updates: EddaUpdates, parallel_build_limit: usize, workspace_id: WorkspacePk, change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, quiescent_period: Duration, quiesced_notify: Arc<Notify>, quiesced_token: CancellationToken, last_compressing_heartbeat_rx: watch::Receiver<Instant>, task_token: CancellationToken, server_tracker: TaskTracker, ) -> Self { let connection_metadata = nats.metadata_clone(); let prefix = nats.metadata().subject_prefix().map(|s| s.to_owned()); let state = AppState::new( workspace_id, change_set_id, nats, frigg, edda_updates, parallel_build_limit, ctx_builder, server_tracker, ); // Set up a check interval that ideally fires more often than the quiescent period let check_interval = time::interval(quiescent_period.checked_div(10).unwrap_or(quiescent_period)); let captured = QuiescedCaptured { instance_id_str: metadata.instance_id().to_string().into_boxed_str(), workspace_id_str: workspace_id.to_string().into_boxed_str(), change_set_id_str: change_set_id.to_string().into_boxed_str(), quiesced_notify: quiesced_notify.clone(), last_compressing_heartbeat_rx, }; let inactive_aware_incoming = incoming // Frequency at which we check for a quiet period .timeout_repeating(check_interval) // Fire quiesced_notify which triggers a specific shutdown of the serial dvu task where // we *know* we want to remove the task from the set of work. .inspect_err(move |_elapsed| { let QuiescedCaptured { instance_id_str, workspace_id_str, change_set_id_str, quiesced_notify, last_compressing_heartbeat_rx, } = &captured; let last_heartbeat_elapsed = last_compressing_heartbeat_rx.borrow().elapsed(); debug!( service.instance.id = instance_id_str, si.workspace.id = workspace_id_str, si.change_set.id = change_set_id_str, last_heartbeat_elapsed = last_heartbeat_elapsed.as_secs(), quiescent_period = quiescent_period.as_secs(), ); if last_heartbeat_elapsed > quiescent_period { debug!( service.instance.id = instance_id_str, si.workspace.id = workspace_id_str, si.change_set.id = change_set_id_str, "rate of requests has become inactive, triggering a quiesced shutdown", ); // Notify the serial dvu task that we want to shutdown due to a quiet period quiesced_notify.notify_one(); } }) // Continue processing messages as normal until the Naxum app's graceful shutdown is // triggered. This means we turn the stream back from a stream of // `Result<Result<Message, _>, Elapsed>` into `Result<Message, _>` .filter_map(|maybe_elapsed_item| maybe_elapsed_item.ok()); let app = ServiceBuilder::new() .layer(MatchedSubjectLayer::new().for_subject( EddaChangeSetRequestsForSubject::with_prefix(prefix.as_deref()), )) .layer( TraceLayer::new() .make_span_with( telemetry_nats::NatsMakeSpan::builder(connection_metadata).build(), ) .on_response(telemetry_nats::NatsOnResponse::new()), ) .layer(PostProcessLayer::new()) .service(handlers::default.with_state(state)) .map_response(Response::into_response); let inner = naxum::serve_with_incoming_limit(inactive_aware_incoming, app.into_make_service(), 1) .with_graceful_shutdown(graceful_shutdown_signal(task_token, quiesced_token)); let inner_fut = inner.into_future(); Self { _metadata: metadata, workspace_id, change_set_id, inner: Box::new(inner_fut), } } pub(crate) async fn try_run(self) -> Result<()> { self.inner.await.map_err(Error::Naxum)?; metric!(counter.change_set_processor_task.change_set_task = -1); debug!( task = Self::NAME, si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, "shutdown complete", ); Ok(()) } } struct QuiescedCaptured { instance_id_str: Box<str>, workspace_id_str: Box<str>, change_set_id_str: Box<str>, quiesced_notify: Arc<Notify>, last_compressing_heartbeat_rx: watch::Receiver<Instant>, } #[derive(Clone, Debug)] struct EddaChangeSetRequestsForSubject { prefix: Option<()>, } impl EddaChangeSetRequestsForSubject { fn with_prefix(prefix: Option<&str>) -> Self { Self { prefix: prefix.map(|_p| ()), } } } impl<R> ForSubject<R> for EddaChangeSetRequestsForSubject where R: MessageHead, { fn call(&mut self, req: &mut naxum::Message<R>) { let mut parts = req.subject().split('.'); match self.prefix { Some(_) => { if let ( Some(prefix), Some(p1), Some(p2), Some(p3), Some(_workspace_id), Some(_change_set_id), None, ) = ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { let matched = format!("{prefix}.{p1}.{p2}.{p3}.:workspace_id.:change_set_id"); req.extensions_mut().insert(MatchedSubject::from(matched)); }; } None => { if let ( Some(p1), Some(p2), Some(p3), Some(_workspace_id), Some(_change_set_id), None, ) = ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { let matched = format!("{p1}.{p2}.{p3}.:workspace_id.:change_set_id"); req.extensions_mut().insert(MatchedSubject::from(matched)); }; } } } } // Await either a graceful shutdown signal from the task or an inactive request stream trigger. async fn graceful_shutdown_signal( task_token: CancellationToken, quiescence_token: CancellationToken, ) { tokio::select! { _ = task_token.cancelled() => {} _ = quiescence_token.cancelled() => {} } } mod handlers { use std::{ collections::BTreeSet, result, }; use dal::{ ChangeSet, ChangeSetId, DalContext, Schema, Ulid, WorkspacePk, WorkspaceSnapshotAddress, }; use frigg::FriggStore; use naxum::{ Json, extract::State, response::{ IntoResponse, Response, }, }; use ringmap::RingMap; use si_data_nats::Subject; use si_events::{ change_batch::ChangeBatchAddress, workspace_snapshot::{ Change, EntityKind, }, }; use telemetry::prelude::*; use telemetry_utils::metric; use thiserror::Error; use super::app_state::AppState; use crate::{ api_types::change_set_request::CompressedChangeSetRequest, materialized_view, updates::EddaUpdates, }; #[remain::sorted] #[derive(Debug, Error)] pub(crate) enum HandlerError { #[error("change batch not found: {0}")] ChangeBatchNotFound(ChangeBatchAddress), /// Failures related to ChangeSets #[error("Change set error: {0}")] ChangeSet(#[from] dal::ChangeSetError), #[error("compute executor error: {0}")] ComputeExecutor(#[from] dal::DedicatedExecutorError), /// When failing to create a DAL context #[error("error creating a dal ctx: {0}")] DalTransactions(#[from] dal::TransactionsError), #[error("frigg error: {0}")] Frigg(#[from] frigg::Error), #[error("layerdb error: {0}")] LayerDb(#[from] si_layer_cache::LayerDbError), #[error("materialized view error: {0}")] MaterializedView(#[from] materialized_view::MaterializedViewError), #[error("schema error: {0}")] Schema(#[from] dal::SchemaError), /// When failing to find the workspace #[error("workspace error: {0}")] Workspace(#[from] dal::WorkspaceError), /// When failing to do an operation using the [`WorkspaceSnapshot`] #[error("workspace snapshot error: {0}")] WorkspaceSnapshot(#[from] dal::WorkspaceSnapshotError), /// When failing to send a [`WsEvent`] #[error("failed to construct ws event: {0}")] WsEvent(#[from] dal::WsEventError), } type Result<T> = result::Result<T, HandlerError>; impl IntoResponse for HandlerError { fn into_response(self) -> Response { metric!(counter.change_set_processor_task.failed_rebase = 1); // TODO(fnichol): there are different responses, esp. for expected interrupted error!(si.error.message = ?self, "failed to process message"); Response::default_internal_server_error() } } pub(crate) async fn default( State(state): State<AppState>, subject: Subject, Json(request): Json<CompressedChangeSetRequest>, ) -> Result<()> { let AppState { workspace_id, change_set_id, nats: _, frigg, edda_updates, parallel_build_limit, ctx_builder, server_tracker: _, } = state; let ctx = ctx_builder .build_for_change_set_as_system(workspace_id, change_set_id, None) .await?; let span = current_span_for_instrument_at!("info"); if !span.is_disabled() { span.record("si.workspace.id", workspace_id.to_string()); span.record("si.change_set.id", change_set_id.to_string()); } let to_process_change_set = ChangeSet::get_by_id(&ctx, change_set_id).await?; // We should skip any change sets that are not active // Active in this case is open, needs approval status, approved or rejected if !to_process_change_set.status.is_active() { debug!("Attempted to process a non-active change set. Skipping"); return Ok(()); } process_request( &ctx, &frigg, &edda_updates, parallel_build_limit, subject, workspace_id, change_set_id, request, ) .await } #[instrument( name = "edda.requests.change_set.process", level = "info", skip_all, fields( otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, si.workspace.id = %workspace_id, si.change_set.id = %change_set_id, si.edda.compressed_request.kind = request.as_ref(), si.edda.src_requests.count = request.src_requests_count(), ) )] #[allow(clippy::too_many_arguments)] async fn process_request( ctx: &DalContext, frigg: &FriggStore, edda_updates: &EddaUpdates, parallel_build_limit: usize, subject: Subject, workspace_id: WorkspacePk, change_set_id: ChangeSetId, request: CompressedChangeSetRequest, ) -> Result<()> { let span = current_span_for_instrument_at!("info"); let otel_name = { let mut parts = subject.as_str().split('.'); match ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), ) { (Some(p1), Some(p2), Some(p3), Some(_workspace_id), Some(_change_set_id), None) => { format!("{p1}.{p2}.{p3}.:workspace_id.:change_set_id process") } _ => format!("{} process", subject.as_str()), } }; span.record("messaging.destination", subject.as_str()); span.record("otel.name", otel_name.as_str()); match request { CompressedChangeSetRequest::NewChangeSet { src_requests_count: _, base_change_set_id: _, new_change_set_id: _, to_snapshot_address, change_batch_addresses, } => { let index_was_copied = materialized_view::try_reuse_mv_index_for_new_change_set( ctx, frigg, edda_updates, to_snapshot_address, ) .await .map_err(|err| span.record_err(err))?; if index_was_copied { process_incremental_updates( ctx, frigg, edda_updates, parallel_build_limit, change_set_id, to_snapshot_address, // both snapshot addrs will use `to` to_snapshot_address, // both snapshot addrs will use `to` change_batch_addresses, ) .await } // If we couldn't copy the index successfully, fall back to rebuild else { materialized_view::build_all_mv_for_change_set( ctx, frigg, edda_updates, parallel_build_limit, None, "explicit rebuild", ) .await .map_err(Into::into) } } CompressedChangeSetRequest::Rebuild { .. } => { // Rebuild materialized_view::build_all_mv_for_change_set( ctx, frigg, edda_updates, parallel_build_limit, None, "explicit rebuild", ) .await .map_err(Into::into) } CompressedChangeSetRequest::RebuildChangedDefinitions { .. } => { // Rebuild only change set MVs with outdated definition checksums // Since we're not processing incremental changes, use build_all_mv_for_change_set // which will detect and rebuild only outdated definitions materialized_view::build_all_mv_for_change_set( ctx, frigg, edda_updates, parallel_build_limit, None, "selective rebuild based on definition checksums", ) .await .map_err(Into::into) } CompressedChangeSetRequest::Update { src_requests_count: _, from_snapshot_address, to_snapshot_address, change_batch_addresses, } => { // Index exists if frigg .get_change_set_index(workspace_id, change_set_id) .await .map_err(|err| span.record_err(err))? .is_some() { // Always use unified approach that deduplicates work between explicit updates and changed definitions let mut changes = Vec::new(); // Load all change batches and concatenate all changes from all batches for change_batch_address in change_batch_addresses { let change_batch = ctx .layer_db() .change_batch() .read_wait_for_memory(&change_batch_address) .await .map_err(|err| span.record_err(err))? .ok_or(HandlerError::ChangeBatchNotFound(change_batch_address))?; changes.extend_from_slice(change_batch.changes()); } changes = deduplicate_changes(changes); post_process_changes(ctx, &mut changes).await?; // build_mv_for_changes_in_change_set now automatically combines // explicit changes and outdated definitions materialized_view::build_mv_for_changes_in_change_set( ctx, frigg, edda_updates, parallel_build_limit, change_set_id, from_snapshot_address, to_snapshot_address, &changes, ) .await .map_err(Into::into) } // Index does not exist else { // todo: this is where we'd handle reusing an index from another change set if // the snapshots match! let build_reason = "initial build with changed definitions"; materialized_view::build_all_mv_for_change_set( ctx, frigg, edda_updates, parallel_build_limit, None, build_reason, ) .await .map_err(Into::into) } } } .inspect(|_| span.record_ok()) .map_err(|err| span.record_err(err)) } /// In order to have correct materialized views, sometimes a change in one thing /// means we should report a change in some other thing, even though that thing /// has not actually changed. For example, if an overlay function has been added /// to a schema, we need to recalculate the materialized views for the schema /// variants under that schema. #[instrument( level = "info", name = "edda.requests.change_set.process.post_process_changes", skip_all )] async fn post_process_changes(ctx: &DalContext, changes: &mut Vec<Change>) -> Result<()> { let mut overlay_category_changed = false; let mut changed_schemas = BTreeSet::new(); let mut changed_variants = BTreeSet::new(); for change in changes.iter() { match change.entity_kind { EntityKind::CategoryOverlay => { overlay_category_changed = true; } EntityKind::Schema => { let id: Ulid = change.entity_id.into(); changed_schemas.insert(id); } EntityKind::SchemaVariant => { let id: Ulid = change.entity_id.into(); changed_variants.insert(id); } _ => {} } } if overlay_category_changed { for changed_schema_id in changed_schemas { let variant_ids = Schema::list_schema_variant_ids(ctx, changed_schema_id.into()).await?; for variant_id in variant_ids { let variant_ulid: Ulid = variant_id.into(); if changed_variants.contains(&variant_ulid) { continue; } let merkle_tree_hash = ctx .workspace_snapshot()? .get_node_weight(variant_id) .await? .merkle_tree_hash(); changes.push(Change { entity_id: variant_ulid.into(), entity_kind: EntityKind::SchemaVariant, merkle_tree_hash, }); } } } Ok(()) } #[allow(clippy::too_many_arguments)] async fn process_incremental_updates( ctx: &DalContext, frigg: &FriggStore, edda_updates: &EddaUpdates, parallel_build_limit: usize, change_set_id: ChangeSetId, from_snapshot_address: WorkspaceSnapshotAddress, to_snapshot_address: WorkspaceSnapshotAddress, change_batch_addresses: Vec<ChangeBatchAddress>, ) -> Result<()> { let mut changes = Vec::new(); // Load all change batches and concatenate all changes from all batches for change_batch_address in change_batch_addresses { let change_batch = ctx .layer_db() .change_batch() .read_wait_for_memory(&change_batch_address) .await? .ok_or(HandlerError::ChangeBatchNotFound(change_batch_address))?; changes.extend_from_slice(change_batch.changes()); } changes = deduplicate_changes(changes); post_process_changes(ctx, &mut changes).await?; materialized_view::build_mv_for_changes_in_change_set( ctx, frigg, edda_updates, parallel_build_limit, change_set_id, from_snapshot_address, to_snapshot_address, &changes, ) .await .map_err(Into::into) } fn deduplicate_changes(changes: Vec<Change>) -> Vec<Change> { let map: RingMap<_, _> = changes .into_iter() .map(|change| { ( (change.entity_kind, change.entity_id), change.merkle_tree_hash, ) }) .collect(); map.into_iter() .map(|((entity_kind, entity_id), merkle_tree_hash)| Change { entity_id, entity_kind, merkle_tree_hash, }) .collect() } } mod app_state { //! Application state for a change set processor. use dal::DalContextBuilder; use frigg::FriggStore; use si_data_nats::NatsClient; use si_events::{ ChangeSetId, WorkspacePk, }; use tokio_util::task::TaskTracker; use crate::updates::EddaUpdates; /// Application state. #[derive(Clone, Debug)] pub(crate) struct AppState { /// Workspace ID for the task pub(crate) workspace_id: WorkspacePk, /// Change set ID for the task pub(crate) change_set_id: ChangeSetId, /// NATS client #[allow(dead_code)] pub(crate) nats: NatsClient, /// Frigg store pub(crate) frigg: FriggStore, /// Publishes patch and index update messages pub(crate) edda_updates: EddaUpdates, /// Parallelism limit for materialized view builds pub(crate) parallel_build_limit: usize, /// DAL context builder for each processing request pub(crate) ctx_builder: DalContextBuilder, /// A task tracker for server-level tasks that can outlive the lifetime of a change set /// processor task #[allow(dead_code)] pub(crate) server_tracker: TaskTracker, } impl AppState { /// Creates a new [`AppState`]. #[allow(clippy::too_many_arguments)] pub(crate) fn new( workspace_id: WorkspacePk, change_set_id: ChangeSetId, nats: NatsClient, frigg: FriggStore, edda_updates: EddaUpdates, parallel_build_limit: usize, ctx_builder: DalContextBuilder, server_tracker: TaskTracker, ) -> Self { Self { workspace_id, change_set_id, nats, frigg, edda_updates, parallel_build_limit, ctx_builder, server_tracker, } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server