Skip to main content
Glama
deployment.rs23.2 kB
use std::{ collections::BinaryHeap, sync::Arc, time::Duration, }; use dal::DalContext; use frigg::FriggStore; use si_frontend_mv_types::{ index::deployment::DeploymentMvIndexV2, materialized_view::MaterializedView, object::{ FrontendObject, patch::{ DeploymentIndexUpdate, DeploymentPatchBatch, DeploymentUpdateMeta, }, }, reference::ReferenceKind, }; use telemetry::prelude::*; use tokio::{ task::JoinSet, time::Instant, }; use crate::{ materialized_view::{ BuildMvInnerReturn, BuildMvOp, DeploymentBuildMvTaskResult, MaterializedViewError, MvBuilderResult, build_mv, }, updates::EddaUpdates, }; /// This function builds all Materialized Views (MVs) this deployment environment #[instrument( name = "materialized_view.build_all_mvs_for_deployment", level = "info", skip_all, fields( si.materialized_view.reason = reason_message, si.edda.mv.count = Empty, si.edda.mv.avg_build_elapsed_ms = Empty, si.edda.mv.max_build_elapsed_ms = Empty, si.edda.mv.slowest_kind = Empty, ), )] pub async fn build_all_mvs_for_deployment( ctx: &DalContext, frigg: &FriggStore, edda_updates: &EddaUpdates, parallel_build_limit: usize, reason_message: &'static str, ) -> Result<(), MaterializedViewError> { let span = current_span_for_instrument_at!("info"); info!("Started building all deployment MVs"); // Discover all deployment MVs that need to be built let deployment_tasks = discover_deployment_mvs(ctx).await?; // Use the deployment MV parallel processing system let ( frontend_objects, patches, build_count, build_total_elapsed, build_max_elapsed, build_slowest_mv_kind, ) = build_deployment_mv_inner(ctx, frigg, parallel_build_limit, &deployment_tasks).await?; span.record("si.edda.mv.count", build_count); if build_count > 0 { span.record( "si.edda.mv.avg_build_elapsed_ms", build_total_elapsed.as_millis() / build_count, ); span.record( "si.edda.mv.max_build_elapsed_ms", build_max_elapsed.as_millis(), ); span.record("si.edda.mv.slowest_kind", build_slowest_mv_kind); } // Build the deployment index from all the frontend objects let mut index_entries: Vec<_> = frontend_objects.into_iter().map(Into::into).collect(); index_entries.sort(); debug!("index_entries {:?}", index_entries); let mv_index = DeploymentMvIndexV2::new(index_entries); let mv_index_frontend_object = FrontendObject::try_from(mv_index)?; let from_index_checksum = mv_index_frontend_object.checksum.to_owned(); let to_index_checksum = from_index_checksum.clone(); let meta = DeploymentUpdateMeta { from_index_checksum, to_index_checksum, }; let patch_batch = DeploymentPatchBatch::new(meta.clone(), patches); let index_update = DeploymentIndexUpdate::new(meta, mv_index_frontend_object.checksum.to_owned()); // Store the index on frigg frigg .put_deployment_index(&mv_index_frontend_object) .await?; // publish updates edda_updates .publish_deployment_patch_batch(patch_batch) .await?; edda_updates .publish_deployment_index_update(index_update) .await?; info!("Started building all deployment MVs"); Ok(()) } /// This function builds only deployment MVs whose definition checksum is outdated /// by comparing current definition checksums with those stored in the deployment index #[instrument( name = "materialized_view.build_outdated_mvs_for_deployment", level = "info", skip_all, fields( si.materialized_view.reason = reason_message, si.edda.mv.count = Empty, si.edda.mv.avg_build_elapsed_ms = Empty, si.edda.mv.max_build_elapsed_ms = Empty, si.edda.mv.slowest_kind = Empty, si.edda.mv.outdated_count = Empty, si.edda.mv.total_discovered = Empty, ), )] pub async fn build_outdated_mvs_for_deployment( ctx: &DalContext, frigg: &FriggStore, edda_updates: &EddaUpdates, parallel_build_limit: usize, reason_message: &'static str, ) -> Result<(), MaterializedViewError> { let span = current_span_for_instrument_at!("info"); // Get current definition checksums from the registry let current_definition_checksums = si_frontend_mv_types::definition_checksum::materialized_view_definition_checksums(); // Get existing deployment index to compare checksums let existing_deployment_index_frontend_object = frigg.get_deployment_index().await?; let existing_deployment_index = match existing_deployment_index_frontend_object { Some((deployment_index_obj, _)) => { // Parse the deployment index to get the stored definition checksums match serde_json::from_value::< si_frontend_mv_types::index::deployment::DeploymentMvIndexVersion, >(deployment_index_obj.data) { Ok(si_frontend_mv_types::index::deployment::DeploymentMvIndexVersion::V1(_)) => { // We don't have enough information to determine which MV kinds have had definition changes. return build_all_mvs_for_deployment( ctx, frigg, edda_updates, parallel_build_limit, "fallback because of index upgrade", ) .await; } Err(_) => { return build_all_mvs_for_deployment( ctx, frigg, edda_updates, parallel_build_limit, "fallback because of index parsing error", ) .await; } Ok(si_frontend_mv_types::index::deployment::DeploymentMvIndexVersion::V2( deployment_index, )) => deployment_index, } } None => { // No existing index, so rebuild everything debug!("No existing deployment index found, rebuilding all MVs"); return build_all_mvs_for_deployment( ctx, frigg, edda_updates, parallel_build_limit, "fallback because of missing index", ) .await; } }; // Discover all deployment MVs that could be built let all_deployment_tasks = discover_deployment_mvs(ctx).await?; span.record("si.edda.mv.total_discovered", all_deployment_tasks.len()); // Filter to only tasks where checksums differ or are missing let outdated_tasks: Vec<_> = all_deployment_tasks .into_iter() .filter(|task| { let mv_kind_str = task.mv_kind.to_string(); let current_checksum = current_definition_checksums.get(&mv_kind_str); let existing_checksum = existing_deployment_index .definition_checksums .get(&mv_kind_str); match (current_checksum, existing_checksum) { (Some(current), Some(existing)) => current != existing, (Some(_), None) => true, // New MV type (None, Some(_)) => false, // MV type no longer exists (shouldn't happen but handle gracefully) (None, None) => false, // Neither exists (shouldn't happen) } }) .collect(); span.record("si.edda.mv.outdated_count", outdated_tasks.len()); if outdated_tasks.is_empty() { info!("No outdated deployment MVs found, skipping rebuild"); return Ok(()); } debug!( "Found {} outdated deployment MVs out of {} total: {:?}", outdated_tasks.len(), outdated_tasks.len() + existing_deployment_index.definition_checksums.len(), outdated_tasks .iter() .map(|t| t.mv_kind.to_string()) .collect::<Vec<_>>() ); // Use the deployment MV parallel processing system with filtered tasks let ( frontend_objects, patches, build_count, build_total_elapsed, build_max_elapsed, build_slowest_mv_kind, ) = build_deployment_mv_inner(ctx, frigg, parallel_build_limit, &outdated_tasks).await?; span.record("si.edda.mv.count", build_count); if build_count > 0 { span.record( "si.edda.mv.avg_build_elapsed_ms", build_total_elapsed.as_millis() / build_count, ); span.record( "si.edda.mv.max_build_elapsed_ms", build_max_elapsed.as_millis(), ); span.record("si.edda.mv.slowest_kind", build_slowest_mv_kind); } // Build the deployment index from all the frontend objects (both updated and existing) let mut index_entries: Vec<_> = frontend_objects.into_iter().map(Into::into).collect(); // For MVs that weren't rebuilt, keep their existing entries from the current index let rebuilt_kinds: std::collections::HashSet<_> = outdated_tasks .iter() .map(|task| task.mv_kind.to_string()) .collect(); // Add entries for MVs that weren't rebuilt for entry in existing_deployment_index.mv_list { if !rebuilt_kinds.contains(&entry.kind) { index_entries.push(entry); } } index_entries.sort(); debug!("index_entries {:?}", index_entries); let mv_index = si_frontend_mv_types::index::deployment::DeploymentMvIndexV2::new(index_entries); let mv_index_frontend_object = si_frontend_mv_types::object::FrontendObject::try_from(mv_index)?; let from_index_checksum = mv_index_frontend_object.checksum.to_owned(); let to_index_checksum = from_index_checksum.clone(); let meta = si_frontend_mv_types::object::patch::DeploymentUpdateMeta { from_index_checksum, to_index_checksum, }; let patch_batch = si_frontend_mv_types::object::patch::DeploymentPatchBatch::new(meta.clone(), patches); let index_update = si_frontend_mv_types::object::patch::DeploymentIndexUpdate::new( meta, mv_index_frontend_object.checksum.to_owned(), ); // Store the index on frigg frigg .put_deployment_index(&mv_index_frontend_object) .await?; // publish updates edda_updates .publish_deployment_patch_batch(patch_batch) .await?; edda_updates .publish_deployment_index_update(index_update) .await?; Ok(()) } /// A deployment MV task that needs to be built for the deployment environment. /// Unlike changeset MVs, these are not triggered by graph changes but by deployment-scoped /// data that exists outside the workspace graph (e.g., cached modules, global configuration, etc.). #[derive(Debug, Clone, Eq, PartialEq)] pub struct DeploymentMvTask { pub mv_kind: ReferenceKind, pub mv_id: String, pub priority: si_events::materialized_view::BuildPriority, } impl Ord for DeploymentMvTask { fn cmp(&self, other: &Self) -> std::cmp::Ordering { match self.priority.cmp(&other.priority) { std::cmp::Ordering::Equal => { // Within the same priority, order by MV kind and then by ID match self.mv_kind.cmp(&other.mv_kind) { std::cmp::Ordering::Equal => self.mv_id.cmp(&other.mv_id), ord => ord, } } ord => ord, } } } impl PartialOrd for DeploymentMvTask { fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.cmp(other)) } } /// Discovers all deployment-scoped MVs that need to be built. /// This function examines deployment-scoped data sources and creates tasks /// for all individual objects and collections that should be built. async fn discover_deployment_mvs( ctx: &DalContext, ) -> Result<Vec<DeploymentMvTask>, MaterializedViewError> { use dal::cached_module::CachedModule; let mut tasks = Vec::new(); // Add the existing CachedSchemas collection MV tasks.push(DeploymentMvTask { mv_kind: ReferenceKind::CachedSchemas, mv_id: "cached_schemas".to_string(), // Collection has a fixed ID priority: <si_frontend_mv_types::cached_schemas::CachedSchemas as MaterializedView>::build_priority(), }); // Discover individual CachedSchema and CachedDefaultVariant objects from cached modules let modules = CachedModule::latest_modules(ctx).await?; for module in modules { // Add individual CachedSchema MV task tasks.push(DeploymentMvTask { mv_kind: ReferenceKind::CachedSchema, mv_id: module.schema_id.to_string(), priority: <si_frontend_mv_types::cached_schema::CachedSchema as MaterializedView>::build_priority(), }); // Add CachedDefaultVariant MV task tasks.push(DeploymentMvTask { mv_kind: ReferenceKind::CachedDefaultVariant, mv_id: module.schema_id.to_string(), priority: <si_frontend_mv_types::cached_default_variant::CachedDefaultVariant as MaterializedView>::build_priority(), }); } Ok(tasks) } /// Builds deployment MVs using the same parallel processing pattern as changeset MVs. /// Uses a priority queue and JoinSet for concurrent execution with error handling. #[instrument( name = "materialized_view.build_deployment_mv_inner", level = "debug", skip_all )] async fn build_deployment_mv_inner( ctx: &DalContext, frigg: &FriggStore, parallel_build_limit: usize, deployment_tasks: &[DeploymentMvTask], ) -> Result<BuildMvInnerReturn, MaterializedViewError> { let mut frontend_objects = Vec::new(); let mut patches = Vec::new(); let mut build_tasks = JoinSet::new(); let mut queued_mv_builds = BinaryHeap::new(); let maybe_deployment_mv_index = Arc::new(frigg.get_deployment_index().await?.map(|result| result.0)); // Queue all deployment tasks for task in deployment_tasks { queued_mv_builds.push(task.clone()); } let mut build_total_elapsed = Duration::from_nanos(0); let mut build_count: u128 = 0; let mut build_max_elapsed = Duration::from_nanos(0); let mut build_slowest_mv_kind: &str = "N/A"; loop { // If there aren't any queued builds waiting for the concurrency limit then // we've finished everything and are able to send off the collected // FrontendObjects and ObjectPatches to update the index. if queued_mv_builds.is_empty() && build_tasks.is_empty() { break; } // Spawn as many of the queued build tasks as we can, up to the concurrency limit. while !queued_mv_builds.is_empty() && build_tasks.len() < parallel_build_limit { let Some(DeploymentMvTask { mv_kind, mv_id, .. }) = queued_mv_builds.pop() else { // This _really_ shouldn't ever return `None` as we just checked that // `queued_mv_builds` is not empty. break; }; spawn_deployment_mv_task( &mut build_tasks, ctx, frigg, mv_kind, mv_id, maybe_deployment_mv_index.clone(), ) .await?; } if let Some(join_result) = build_tasks.join_next().await { let (kind, mv_id, build_duration, execution_result) = join_result?; telemetry_utils::metric!(counter.edda.mv_build = -1, label = kind.to_string()); match execution_result { Ok((maybe_patch, maybe_frontend_object)) => { // Store deployment objects using deployment storage if let Some(frontend_object) = maybe_frontend_object { frigg.insert_deployment_object(&frontend_object).await?; frontend_objects.push(frontend_object); } if let Some(patch) = maybe_patch { // Deployment MVs don't need streaming patches since they're not changeset-scoped debug!("Deployment Patch: {:?}", patch); patches.push(patch); } build_count += 1; if build_duration > build_max_elapsed { build_max_elapsed = build_duration; build_slowest_mv_kind = kind.into(); } build_total_elapsed += build_duration; } Err(err) => { error!(name = "deployment_mv_build_error", si.error.message = err.to_string(), kind = %kind, id = %mv_id); return Err(err); } } } } Ok(( frontend_objects, patches, build_count, build_total_elapsed, build_max_elapsed, build_slowest_mv_kind, )) } /// Spawns a build task for a specific deployment MV. async fn spawn_deployment_mv_task( build_tasks: &mut JoinSet<DeploymentBuildMvTaskResult>, ctx: &DalContext, frigg: &FriggStore, mv_kind: ReferenceKind, mv_id: String, maybe_deployment_mv_index: Arc<Option<FrontendObject>>, ) -> Result<(), MaterializedViewError> { // Record the currently running number of MV build tasks telemetry_utils::metric!(counter.edda.mv_build = 1, label = mv_kind.to_string()); // Info span for the spawned task let span = info_span!( "si.edda.deployment.mv.build", si.edda.mv.kind = %mv_kind, si.edda.mv.id = %mv_id, ) .or_current(); match mv_kind { ReferenceKind::CachedSchemas => { build_tasks.spawn( build_mv_for_deployment_task( ctx.clone(), frigg.clone(), mv_id, mv_kind, dal_materialized_views::cached::schemas::assemble(ctx.clone()), maybe_deployment_mv_index, ) .instrument(span), ); } ReferenceKind::CachedSchema => { let schema_id: dal::SchemaId = mv_id.parse().map_err(|_| { dal::SchemaError::UninstalledSchemaNotFound(dal::SchemaId::from(ulid::Ulid::nil())) })?; build_tasks.spawn( build_mv_for_deployment_task( ctx.clone(), frigg.clone(), mv_id, mv_kind, dal_materialized_views::cached::schema::assemble(ctx.clone(), schema_id), maybe_deployment_mv_index, ) .instrument(span), ); } ReferenceKind::CachedSchemaVariant => { // Note(Brit): Currently, we only want to expose the latest variant (which is the default) for a schema // which means it's sufficient to only build the DefaultVariant. // We'll stop building them first, then we can clean up later return Ok(()); } ReferenceKind::CachedDefaultVariant => { let schema_id: dal::SchemaId = mv_id.parse().map_err(|_| { dal::SchemaError::UninstalledSchemaNotFound(dal::SchemaId::from(ulid::Ulid::nil())) })?; build_tasks.spawn( build_mv_for_deployment_task( ctx.clone(), frigg.clone(), mv_id, mv_kind, dal_materialized_views::cached::schema::variant::default::assemble( ctx.clone(), schema_id, ), maybe_deployment_mv_index, ) .instrument(span), ); } _ => { // This shouldn't happen for deployment MVs, but we'll handle it gracefully warn!("Unexpected MV kind for deployment: {:?}", mv_kind); return Ok(()); } } Ok(()) } /// Builds an MV task specifically for deployment (no Change context needed). async fn build_mv_for_deployment_task<F, T, E>( _ctx: DalContext, frigg: FriggStore, mv_id: String, mv_kind: ReferenceKind, build_mv_future: F, maybe_deployment_mv_index: Arc<Option<FrontendObject>>, ) -> DeploymentBuildMvTaskResult where F: std::future::Future<Output = Result<T, E>> + Send + 'static, T: serde::Serialize + TryInto<FrontendObject> + si_frontend_mv_types::checksum::FrontendChecksum + Send + 'static, E: Into<MaterializedViewError> + Send + 'static, MaterializedViewError: From<E>, MaterializedViewError: From<<T as TryInto<FrontendObject>>::Error>, { debug!(kind = %mv_kind, id = %mv_id, "Building Deployment MV"); let start = Instant::now(); let result = build_mv_for_deployment_task_inner( &frigg, mv_id.clone(), mv_kind, build_mv_future, maybe_deployment_mv_index, ) .await; (mv_kind, mv_id, start.elapsed(), result) } /// Inner function for building deployment MV tasks. async fn build_mv_for_deployment_task_inner<F, T, E>( frigg: &FriggStore, mv_id: String, mv_kind: ReferenceKind, build_mv_future: F, maybe_deployment_mv_index: Arc<Option<FrontendObject>>, ) -> MvBuilderResult where F: std::future::Future<Output = Result<T, E>> + Send + 'static, T: serde::Serialize + TryInto<FrontendObject> + si_frontend_mv_types::checksum::FrontendChecksum + Send + 'static, E: Into<MaterializedViewError> + Send + 'static, MaterializedViewError: From<E>, MaterializedViewError: From<<T as TryInto<FrontendObject>>::Error>, { // For deployment MVs, check if object exists in deployment storage let op = { let maybe_previous_version = match frigg .get_current_deployment_object_with_index( mv_kind.into(), &mv_id, (*maybe_deployment_mv_index).clone(), ) .await { Ok(maybe_previous) => maybe_previous, Err(err) => { warn!( "Unable to retreive previous deployment MV version; proceeding without: {err}" ); None } }; if let Some(previous_version) = maybe_previous_version { BuildMvOp::UpdateFrom { checksum: previous_version.checksum, data: previous_version.data, } } else { BuildMvOp::Create } }; build_mv(op, mv_kind, build_mv_future).await }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server