Skip to main content
Glama
change_set.rs42.7 kB
use std::{ collections::HashSet, str::FromStr, sync::Arc, time::Duration, }; use chrono::{ DateTime, Utc, }; use serde::{ Deserialize, Serialize, }; use si_data_pg::{ PgError, PgRow, }; use si_db::{ HistoryActor, HistoryEvent, User, change_set::FIND_ANCESTORS_QUERY, }; use si_events::{ RebaseBatchAddressKind, WorkspaceSnapshotAddress, audit_log::AuditLogKind, merkle_tree_hash::MerkleTreeHash, ulid::Ulid, workspace_snapshot::Checksum, }; use si_id::{ ActionId, EntityId, UserPk, WorkspacePk, }; use si_layer_cache::LayerDbError; use telemetry::prelude::*; use thiserror::Error; use tokio::time; use crate::{ ChangeSetStatus, ComponentError, DalContext, Func, FuncError, Schema, SchemaError, SchemaVariant, SchemaVariantError, TransactionsError, Workspace, WorkspaceError, WorkspaceSnapshot, WorkspaceSnapshotError, WsEvent, WsEventError, action::ActionError, billing_publish::{ self, BillingPublishError, }, slow_rt::SlowRuntimeError, workspace_snapshot::{ DependentValueRoot, dependent_value_root::DependentValueRootError, graph::RebaseBatch, selector::WorkspaceSnapshotSelectorDiscriminants, split_snapshot::{ SplitRebaseBatchVCurrent, SplitSnapshot, }, }, }; pub mod approval; pub mod event; pub mod status; pub mod view; #[remain::sorted] #[derive(Debug, Error)] pub enum ChangeSetError { #[error("billing publish error: {0}")] BillingPublish(#[from] Box<BillingPublishError>), #[error("cannot rename HEAD change set")] CantRenameHeadChangeSet, #[error("change set not approved for apply. Current state: {0}")] ChangeSetNotApprovedForApply(ChangeSetStatus), #[error("change set with id {0} not found")] ChangeSetNotFound(ChangeSetId), #[error("default change set {0} has no workspace snapshot pointer")] DefaultChangeSetNoWorkspaceSnapshotPointer(ChangeSetId), #[error("dependent value root error: {0}")] DependentValueRoot(#[from] DependentValueRootError), #[error("dvu roots are not empty for change set: {0}")] DvuRootsNotEmpty(ChangeSetId), #[error("enum parse error: {0}")] EnumParse(#[from] strum::ParseError), #[error("func error: {0}")] Func(#[from] Box<FuncError>), #[error("invalid user actor pk")] InvalidActor(UserPk), #[error("invalid user system init")] InvalidUserSystemInit, #[error("tokio join error: {0}")] Join(#[from] tokio::task::JoinError), #[error("layer db error: {0}")] LayerDb(#[from] Box<LayerDbError>), #[error("ulid monotonic error: {0}")] Monotonic(#[from] ulid::MonotonicError), #[error("mutex error: {0}")] Mutex(String), #[error("Changeset {0} does not have a base change set")] NoBaseChangeSet(ChangeSetId), #[error("no tenancy set in context")] NoTenancySet, #[error("no workspace_pk is set for change_set_id={0}")] NoWorkspacePkSet(ChangeSetId), #[error("Changeset {0} does not have a workspace snapshot")] NoWorkspaceSnapshot(ChangeSetId), #[error("pg error: {0}")] Pg(#[from] Box<PgError>), #[error("rebaser client error: {0}")] RebaserClient(#[from] Box<rebaser_client::ClientError>), #[error("schema error: {0}")] Schema(#[from] Box<SchemaError>), #[error("schema variant error: {0}")] SchemaVariant(#[from] Box<SchemaVariantError>), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("si db error: {0}")] SiDb(#[from] Box<si_db::Error>), #[error("slow runtime error: {0}")] SlowRuntime(#[from] SlowRuntimeError), #[error("timeout out waiting for dvu after {0}ms")] TimedOutAwaitingDvu(u64), #[error("transactions error: {0}")] Transactions(#[from] Box<TransactionsError>), #[error("ulid decode error: {0}")] UlidDecode(#[from] ulid::DecodeError), #[error( "found an unexpected number of open change sets matching default change set (should be one, found {0:?})" )] UnexpectedNumberOfOpenChangeSetsMatchingDefaultChangeSet(Vec<ChangeSetId>), #[error("workspace error: {0}")] Workspace(#[from] Box<WorkspaceError>), #[error("workspace snapshot error: {0}")] WorkspaceSnapshot(#[from] Box<WorkspaceSnapshotError>), #[error("ws event error: {0}")] WsEvent(#[from] Box<WsEventError>), } impl From<LayerDbError> for ChangeSetError { fn from(value: LayerDbError) -> Self { Box::new(value).into() } } impl From<si_db::Error> for ChangeSetError { fn from(value: si_db::Error) -> Self { Box::new(value).into() } } impl From<PgError> for ChangeSetError { fn from(value: PgError) -> Self { Box::new(value).into() } } impl From<rebaser_client::ClientError> for ChangeSetError { fn from(value: rebaser_client::ClientError) -> Self { Box::new(value).into() } } impl From<TransactionsError> for ChangeSetError { fn from(value: TransactionsError) -> Self { Box::new(value).into() } } impl From<WorkspaceError> for ChangeSetError { fn from(value: WorkspaceError) -> Self { Box::new(value).into() } } impl From<WsEventError> for ChangeSetError { fn from(value: WsEventError) -> Self { Box::new(value).into() } } impl From<ActionError> for ChangeSetApplyError { fn from(value: ActionError) -> Self { Box::new(value).into() } } impl From<ChangeSetError> for ChangeSetApplyError { fn from(value: ChangeSetError) -> Self { Box::new(value).into() } } impl From<ComponentError> for ChangeSetApplyError { fn from(value: ComponentError) -> Self { Box::new(value).into() } } impl From<si_db::Error> for ChangeSetApplyError { fn from(value: si_db::Error) -> Self { Box::new(value).into() } } impl From<TransactionsError> for ChangeSetApplyError { fn from(value: TransactionsError) -> Self { Box::new(value).into() } } /// The primary result type for this module. pub type ChangeSetResult<T> = Result<T, ChangeSetError>; /// A superset of [`ChangeSetError`] used when performing apply logic. #[remain::sorted] #[derive(Debug, Error)] pub enum ChangeSetApplyError { #[error("action error: {0}")] Action(#[from] Box<ActionError>), #[error("action prototype not found for id: {0}")] ActionPrototypeNotFound(ActionId), #[error("Cannot apply changeset {0} to itself")] CannotApplyToItself(ChangeSetId), #[error("change set error: {0}")] ChangeSet(#[from] Box<ChangeSetError>), #[error("component error: {0}")] Component(#[from] Box<ComponentError>), #[error("invalid user: {0}")] InvalidUser(UserPk), #[error("invalid user system init")] InvalidUserSystemInit, #[error("change set ({0}) does not have a base change set")] NoBaseChangeSet(ChangeSetId), #[error("si db error: {0}")] SiDb(#[from] Box<si_db::Error>), #[error("transactions error: {0}")] Transactions(#[from] Box<TransactionsError>), } /// A superset of [`ChangeSetResult`] used when performing apply logic. pub type ChangeSetApplyResult<T> = Result<T, ChangeSetApplyError>; pub use si_id::ChangeSetId; #[derive(Clone, Serialize, Deserialize)] pub struct ChangeSet { pub id: ChangeSetId, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, pub name: String, pub status: ChangeSetStatus, pub base_change_set_id: Option<ChangeSetId>, pub workspace_snapshot_address: WorkspaceSnapshotAddress, pub workspace_id: Option<WorkspacePk>, pub merge_requested_by_user_id: Option<UserPk>, pub merge_requested_at: Option<DateTime<Utc>>, pub reviewed_by_user_id: Option<UserPk>, pub reviewed_at: Option<DateTime<Utc>>, pub created_by_user_id: Option<UserPk>, } impl TryFrom<PgRow> for ChangeSet { type Error = ChangeSetError; fn try_from(value: PgRow) -> Result<Self, Self::Error> { let status_string: String = value.try_get("status")?; let status = ChangeSetStatus::try_from(status_string.as_str())?; Ok(Self { id: value.try_get("id")?, created_at: value.try_get("created_at")?, updated_at: value.try_get("updated_at")?, name: value.try_get("name")?, status, base_change_set_id: value.try_get("base_change_set_id")?, workspace_snapshot_address: value.try_get("workspace_snapshot_address")?, workspace_id: value.try_get("workspace_id")?, merge_requested_by_user_id: value.try_get("merge_requested_by_user_id")?, merge_requested_at: value.try_get("merge_requested_at")?, reviewed_by_user_id: value.try_get("reviewed_by_user_id")?, reviewed_at: value.try_get("reviewed_at")?, created_by_user_id: value.try_get("created_by_user_id")?, }) } } impl ChangeSet { pub async fn new( ctx: &DalContext, name: impl AsRef<str>, base_change_set_id: Option<ChangeSetId>, workspace_snapshot_address: WorkspaceSnapshotAddress, ) -> ChangeSetResult<Self> { let id: Ulid = Ulid::new(); let change_set_id: ChangeSetId = id.into(); let workspace_id = ctx.tenancy().workspace_pk_opt(); let user_id = match ctx.history_actor() { HistoryActor::User(user_pk) => Some(user_pk), _ => None, }; let name = name.as_ref(); let row = ctx .txns() .await? .pg() .query_one( "INSERT INTO change_set_pointers (id, name, base_change_set_id, status, workspace_id, workspace_snapshot_address, created_by_user_id) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *", &[&change_set_id, &name, &base_change_set_id, &ChangeSetStatus::Open.to_string(), &workspace_id, &workspace_snapshot_address, &user_id], ) .await?; let change_set = Self::try_from(row)?; let _history_event = HistoryEvent::new( ctx, "change_set.create", "Change Set created", &serde_json::to_value(&change_set)?, ) .await?; Ok(change_set) } pub async fn fork_head(ctx: &DalContext, name: impl AsRef<str>) -> ChangeSetResult<Self> { let workspace_pk = ctx.workspace_pk()?; let workspace = Workspace::get_by_pk(ctx, workspace_pk).await?; let head = workspace.default_change_set(ctx).await?; let change_set = ChangeSet::new(ctx, name, Some(head.id), head.workspace_snapshot_address).await?; Ok(change_set) } pub async fn into_frontend_type( &self, ctx: &DalContext, ) -> ChangeSetResult<si_frontend_types::ChangeSet> { let merge_requested_by_user = if let Some(merge_requested_by) = self.merge_requested_by_user_id { User::get_by_pk_opt(ctx, merge_requested_by) .await? .map(|user| { if user.name().is_empty() { user.email().clone() } else { user.name().clone() } }) } else { None }; let reviewed_by_user = if let Some(reviewed_by) = self.reviewed_by_user_id { User::get_by_pk_opt(ctx, reviewed_by).await?.map(|user| { if user.name().is_empty() { user.email().clone() } else { user.name().clone() } }) } else { None }; let change_set = si_frontend_types::ChangeSet { created_at: self.created_at, id: self.id, updated_at: self.updated_at, name: self.name.clone(), status: self.status.into(), base_change_set_id: self.base_change_set_id, workspace_id: self.workspace_id.map_or("".to_owned(), |id| id.to_string()), merge_requested_by_user_id: self.merge_requested_by_user_id.map(|s| s.to_string()), merge_requested_by_user, merge_requested_at: self.merge_requested_at, reviewed_by_user_id: self.reviewed_by_user_id.map(|id| id.into()), reviewed_by_user, reviewed_at: self.reviewed_at, created_by_user_id: self.created_by_user_id.map(|id| id.into()), }; Ok(change_set) } pub async fn update_workspace_id( &mut self, ctx: &DalContext, workspace_id: WorkspacePk, ) -> ChangeSetResult<()> { ctx.txns() .await? .pg() .query_none( "UPDATE change_set_pointers SET workspace_id = $2, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", &[&self.id, &workspace_id], ) .await?; self.workspace_id = Some(workspace_id); Ok(()) } pub fn workspace_id(&self) -> ChangeSetResult<WorkspacePk> { self.workspace_id.ok_or(ChangeSetError::NoTenancySet) } async fn workspace(&self, ctx: &DalContext) -> ChangeSetResult<Workspace> { Ok(Workspace::get_by_pk(ctx, self.workspace_id()?).await?) } pub async fn is_head(&self, ctx: &DalContext) -> ChangeSetResult<bool> { Ok(self.workspace(ctx).await?.default_change_set_id() == self.id) } #[instrument(name = "change_set.update_pointer", level = "debug", skip_all)] pub async fn update_pointer( &mut self, ctx: &DalContext, workspace_snapshot_address: WorkspaceSnapshotAddress, ) -> ChangeSetResult<()> { let old_snapshot_address = self.workspace_snapshot_address; // Update change set pointer ctx.txns() .await? .pg() .query_none( "UPDATE change_set_pointers SET workspace_snapshot_address = $2, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", &[&self.id, &workspace_snapshot_address], ) .await?; // Record when old snapshot was last used ctx.txns() .await? .pg() .query_none( "INSERT INTO snapshot_last_used (snapshot_id, last_used_at, created_at) VALUES ($1, CLOCK_TIMESTAMP(), CLOCK_TIMESTAMP()) ON CONFLICT (snapshot_id) DO UPDATE SET last_used_at = CLOCK_TIMESTAMP()", &[&old_snapshot_address.to_string()], ) .await?; self.workspace_snapshot_address = workspace_snapshot_address; Ok(()) } pub async fn update_status( &mut self, ctx: &DalContext, status: ChangeSetStatus, ) -> ChangeSetResult<()> { ctx.txns() .await? .pg() .query_none( "UPDATE change_set_pointers SET status = $2, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", &[&self.id, &status.to_string()], ) .await?; self.status = status; billing_publish::for_change_set_status_update(ctx, self) .await .map_err(Box::new)?; Ok(()) } pub async fn request_change_set_approval(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { let user_pk = Self::extract_userid_from_context_or_error(ctx).await?; let status = ChangeSetStatus::NeedsApproval; ctx.txns() .await? .pg() .query_none( "UPDATE change_set_pointers SET merge_requested_by_user_id = $2, merge_requested_at = CLOCK_TIMESTAMP(), status = $3, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", &[&self.id, &user_pk, &status.to_string()], ) .await?; self.status = status; Ok(()) } /// Set the status to Open, and clear any reviewed/merge requested info pub async fn reopen_change_set(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { let status = ChangeSetStatus::Open; ctx.txns() .await? .pg() .query_none( "UPDATE change_set_pointers SET reviewed_by_user_id = NULL, reviewed_at = NULL, merge_requested_by_user_id = NULL, merge_requested_at = NULL, status = $2, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", &[&self.id, &status.to_string()], ) .await?; self.status = status; Ok(()) } /// First, transitions the status of the [`ChangeSet`] to [`ChangeSetStatus::NeedsApproval`] /// then [`ChangeSetStatus::Approved`]. Next, checks if DVU Roots still exist. Finally, /// lock every [`SchemaVariant`] and [`Func`] that is currently unlocked pub async fn prepare_for_force_apply(ctx: &DalContext) -> ChangeSetResult<()> { // first change the status to approved and who did it let mut change_set = ChangeSet::get_by_id(ctx, ctx.change_set_id()).await?; change_set.request_change_set_approval(ctx).await?; // then approve it change_set.approve_change_set_for_apply(ctx).await?; // then do the rest Self::prepare_for_apply(ctx).await } /// First, checks if DVU Roots still exist. Next, ensures the [`ChangeSet`] has an /// [`ChangeSetStatus::Approved`]. Finally, /// lock every [`SchemaVariant`] and [`Func`] that is currently unlocked pub async fn prepare_for_apply(ctx: &DalContext) -> ChangeSetResult<()> { Self::prepare_for_apply_inner(ctx, false).await } /// This is a copy of [Self::prepare_for_apply], but skips the status check. This is because /// sdf now handles the approvals flow as part of the fine grained access control work (i.e. /// SpiceDB is intentionally not accessible in the DAL). pub async fn prepare_for_apply_without_status_check(ctx: &DalContext) -> ChangeSetResult<()> { Self::prepare_for_apply_inner(ctx, true).await } // TODO(nick): now that the fine grained access control flag is gone, we can collapse the two // outer methods and chase down tests continuing to use the old method. async fn prepare_for_apply_inner( ctx: &DalContext, dangerous_skip_status_check: bool, ) -> ChangeSetResult<()> { // Ensure that DVU roots are empty before continuing. if DependentValueRoot::roots_exist(ctx).await? { // TODO(nick): we should consider requiring this check in integration tests too. Why did I // not do this at the time of writing? Tests have multiple ways to call "apply", whether // its via helpers or through the change set methods directly. In addition, they test // for success and failure, not solely for success. We should still do this, but not within // the PR corresponding to when this message was written. return Err(ChangeSetError::DvuRootsNotEmpty(ctx.change_set_id())); } // WARNING(nick): we should only skip this status check if using sdf's protected apply logic. if !dangerous_skip_status_check { // if the change set status isn't approved, we shouldn't go // locking stuff let change_set = ChangeSet::get_by_id(ctx, ctx.change_set_id()).await?; if change_set.status != ChangeSetStatus::Approved { return Err(ChangeSetError::ChangeSetNotApprovedForApply( change_set.status, )); } } // Lock all unlocked variants for schema_id in Schema::list_ids(ctx).await.map_err(Box::new)? { let schema = Schema::get_by_id(ctx, schema_id).await.map_err(Box::new)?; let Some(variant) = SchemaVariant::get_unlocked_for_schema(ctx, schema_id) .await .map_err(Box::new)? else { continue; }; let variant_id = variant.id(); variant.lock(ctx).await.map_err(Box::new)?; schema .set_default_variant_id(ctx, variant_id) .await .map_err(Box::new)?; } // Lock all unlocked functions too for func in Func::list_for_default_and_editing(ctx) .await .map_err(Box::new)? { if !func.is_locked { func.lock(ctx).await.map_err(Box::new)?; } } Ok(()) } pub async fn approve_change_set_for_apply(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { let user_pk = Self::extract_userid_from_context_or_error(ctx).await?; let status = ChangeSetStatus::Approved; ctx.txns() .await? .pg() .query_none( "UPDATE change_set_pointers SET reviewed_by_user_id = $2, reviewed_at = CLOCK_TIMESTAMP(), status = $3, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", &[&self.id, &user_pk, &status.to_string()], ) .await?; self.status = status; Ok(()) } pub async fn reject_change_set_for_apply(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { let user_pk = Self::extract_userid_from_context_or_error(ctx).await?; let status = ChangeSetStatus::Rejected; ctx.txns() .await? .pg() .query_none( "UPDATE change_set_pointers SET reviewed_by_user_id = $2, reviewed_at = CLOCK_TIMESTAMP(), status = $3, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", &[&self.id, &user_pk, &status.to_string()], ) .await?; self.status = status; Ok(()) } /// Finds a [`ChangeSet`] across all workspaces, ignoring the provided [`WorkspacePk`] on the /// current [`DalContext`] pub async fn get_by_id_across_workspaces( ctx: &DalContext, change_set_id: ChangeSetId, ) -> ChangeSetResult<Self> { Self::find_across_workspaces(ctx, change_set_id) .await? .ok_or_else(|| ChangeSetError::ChangeSetNotFound(change_set_id)) } /// Finds a [`ChangeSet`] across all workspaces, ignoring the provided [`WorkspacePk`] on the /// current [`DalContext`] #[instrument( name = "change_set.find_across_workspaces", level = "debug", skip_all, fields( si.change_set.id = %change_set_id, si.workspace.id = Empty, ), )] pub async fn find_across_workspaces( ctx: &DalContext, change_set_id: ChangeSetId, ) -> ChangeSetResult<Option<Self>> { let span = current_span_for_instrument_at!("debug"); let row = ctx .txns() .await? .pg() .query_opt( "SELECT * FROM change_set_pointers WHERE id = $1", &[&change_set_id], ) .await?; match row { Some(row) => { let change_set = Self::try_from(row)?; if let Some(workspace_id) = change_set.workspace_id { span.record("si.workspace.id", workspace_id.to_string()); } Ok(Some(change_set)) } None => Ok(None), } } /// Get a change set within the [`WorkspacePk`] set for the current [`DalContext`] pub async fn get_by_id(ctx: &DalContext, change_set_id: ChangeSetId) -> ChangeSetResult<Self> { Self::find(ctx, change_set_id) .await? .ok_or_else(|| ChangeSetError::ChangeSetNotFound(change_set_id)) } /// Find a change set within the [`WorkspacePk`] set for the current [`DalContext`] #[instrument( name = "change_set.find", level = "debug", skip_all, fields( si.change_set.id = %change_set_id, si.workspace.id = Empty, ), )] pub async fn find( ctx: &DalContext, change_set_id: ChangeSetId, ) -> ChangeSetResult<Option<Self>> { let span = current_span_for_instrument_at!("debug"); let workspace_id = ctx.workspace_pk()?; let row = ctx .txns() .await? .pg() .query_opt( "SELECT * FROM change_set_pointers WHERE id = $1 AND workspace_id = $2", &[&change_set_id, &workspace_id], ) .await?; match row { Some(row) => { let change_set = Self::try_from(row)?; if let Some(workspace_id) = change_set.workspace_id { span.record("si.workspace.id", workspace_id.to_string()); } Ok(Some(change_set)) } None => { // warn here so we can see if something is requesting change sets cross workspace warn!( si.workspace.id = %workspace_id, "Change Set Id: {change_set_id} not found for Workspace: {workspace_id}", ); Ok(None) } } } pub async fn list_active(ctx: &DalContext) -> ChangeSetResult<Vec<Self>> { let mut result = vec![]; let rows = ctx .txns() .await? .pg() .query( "SELECT * from change_set_pointers WHERE workspace_id = $1 AND status IN ($2, $3, $4, $5, $6)", &[ &ctx.tenancy().workspace_pk_opt(), &ChangeSetStatus::Open.to_string(), &ChangeSetStatus::NeedsApproval.to_string(), &ChangeSetStatus::NeedsAbandonApproval.to_string(), &ChangeSetStatus::Approved.to_string(), &ChangeSetStatus::Rejected.to_string(), ], ) .await?; for row in rows { result.push(Self::try_from(row)?); } Ok(result) } /// List all change sets that are applied. pub async fn list_all_applied( ctx: &DalContext, workspace_pk: WorkspacePk, ) -> ChangeSetResult<Vec<Self>> { let rows = ctx .txns() .await? .pg() .query( "SELECT * from change_set_pointers WHERE workspace_id = $1 AND status = $2", &[&workspace_pk, &ChangeSetStatus::Applied.to_string()], ) .await?; let mut result = Vec::with_capacity(rows.len()); for row in rows { result.push(Self::try_from(row)?); } Ok(result) } /// List all change sets for a given workspace pub async fn list_all_for_workspace( ctx: &DalContext, workspace_pk: WorkspacePk, ) -> ChangeSetResult<Vec<Self>> { let mut result = vec![]; let rows = ctx .txns() .await? .pg() .query( "SELECT * from change_set_pointers WHERE workspace_id = $1", &[&workspace_pk], ) .await?; for row in rows { result.push(Self::try_from(row)?); } Ok(result) } pub async fn list_active_for_workspace( ctx: &DalContext, workspace_pk: WorkspacePk, ) -> ChangeSetResult<Vec<Self>> { let mut result = vec![]; let rows = ctx .txns() .await? .pg() .query( "SELECT * from change_set_pointers WHERE workspace_id = $1 AND status IN ($2, $3, $4, $5, $6)", &[ &workspace_pk, &ChangeSetStatus::Open.to_string(), &ChangeSetStatus::NeedsApproval.to_string(), &ChangeSetStatus::NeedsAbandonApproval.to_string(), &ChangeSetStatus::Approved.to_string(), &ChangeSetStatus::Rejected.to_string(), ], ) .await?; for row in rows { result.push(Self::try_from(row)?); } Ok(result) } /// Take care when working on these change sets to set the workspace id on the dal context!!! pub async fn list_active_for_all_workspaces(ctx: &DalContext) -> ChangeSetResult<Vec<Self>> { let mut result = vec![]; let rows = ctx .txns() .await? .pg() .query( "SELECT * from change_set_pointers WHERE status IN ($1, $2, $3, 4, 5)", &[ &ChangeSetStatus::Open.to_string(), &ChangeSetStatus::NeedsApproval.to_string(), &ChangeSetStatus::NeedsAbandonApproval.to_string(), &ChangeSetStatus::Approved.to_string(), &ChangeSetStatus::Rejected.to_string(), ], ) .await?; for row in rows { result.push(Self::try_from(row)?); } Ok(result) } /// Applies the current [`ChangeSet`] in the provided [`DalContext`]. [`Actions`](Action) /// are enqueued as needed and only done so if the base [`ChangeSet`] is "HEAD" (i.e. /// the default [`ChangeSet`] of the [`Workspace`]). /// Also sends the relevant WSEvent #[instrument(level = "info", skip_all)] pub async fn apply_to_base_change_set(ctx: &mut DalContext) -> ChangeSetApplyResult<ChangeSet> { let base_change_set_id = ctx.get_workspace_default_change_set_id().await?; if ctx.change_set_id() == base_change_set_id { return Err(ChangeSetApplyError::CannotApplyToItself(base_change_set_id)); } // Apply to the base change with the current change set (non-editing) and commit. let mut change_set_to_be_applied = Self::get_by_id(ctx, ctx.change_set_id()).await?; ctx.update_visibility_and_snapshot_to_visibility(ctx.change_set_id()) .await?; change_set_to_be_applied .apply_to_base_change_set_inner(ctx) .await?; // This is just to send the ws events ctx.blocking_commit_no_rebase().await?; Ok(change_set_to_be_applied) } #[instrument( level = "info", name = "change_set.detect_updates_that_will_be_applied_split", skip_all )] pub async fn detect_updates_that_will_be_applied_split( &self, ctx: &DalContext, ) -> ChangeSetResult<Option<SplitRebaseBatchVCurrent>> { let base_change_set_id = self .base_change_set_id .ok_or(ChangeSetError::NoBaseChangeSet(self.id))?; let base_snapshot = Arc::new( SplitSnapshot::find_for_change_set(ctx, base_change_set_id) .await .map_err(Box::new)?, ); Ok(SplitSnapshot::calculate_rebase_batch( base_snapshot, ctx.workspace_snapshot() .map_err(Box::new)? .as_split_snapshot() .map_err(Box::new)?, ) .await .map_err(Box::new)?) } #[instrument( level = "info", name = "change_set.detect_updates_that_will_be_applied_legacy", skip_all )] pub async fn detect_updates_that_will_be_applied_legacy( &self, ctx: &DalContext, ) -> ChangeSetResult<Option<RebaseBatch>> { let base_change_set_id = self .base_change_set_id .ok_or(ChangeSetError::NoBaseChangeSet(self.id))?; let base_snapshot = Arc::new( WorkspaceSnapshot::find_for_change_set(ctx, base_change_set_id) .await .map_err(Box::new)?, ); Ok(WorkspaceSnapshot::calculate_rebase_batch( base_snapshot, ctx.workspace_snapshot() .map_err(Box::new)? .as_legacy_snapshot() .map_err(Box::new)?, ) .await .map_err(Box::new)?) } /// Applies the current [`ChangeSet`] in the provided [`DalContext`] to its base /// [`ChangeSet`]. This involves performing a rebase request, updating the status /// of the [`ChangeSet`] accordingly, and publishing a WSEvent /// /// This function neither changes the visibility nor the snapshot after performing the /// aforementioned actions. async fn apply_to_base_change_set_inner(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { let workspace_id = self .workspace_id .ok_or(ChangeSetError::NoWorkspacePkSet(self.id))?; let base_change_set_id = self .base_change_set_id .ok_or(ChangeSetError::NoBaseChangeSet(self.id))?; let snapshot_kind: WorkspaceSnapshotSelectorDiscriminants = ctx.workspace_snapshot().map_err(Box::new)?.into(); let maybe_rebase_batch_address = match snapshot_kind { WorkspaceSnapshotSelectorDiscriminants::LegacySnapshot => { if let Some(rebase_batch) = self.detect_updates_that_will_be_applied_legacy(ctx).await? { Some(RebaseBatchAddressKind::Legacy( ctx.write_legacy_rebase_batch(rebase_batch).await?, )) } else { None } } WorkspaceSnapshotSelectorDiscriminants::SplitSnapshot => { if let Some(rebase_batch) = self.detect_updates_that_will_be_applied_split(ctx).await? { Some(RebaseBatchAddressKind::Split( ctx.write_split_snapshot_rebase_batch(rebase_batch).await?, )) } else { None } } }; if let Some(rebase_batch_address) = maybe_rebase_batch_address { let (request_id, reply_fut) = ctx .run_rebase_from_change_set_with_reply( workspace_id, base_change_set_id, rebase_batch_address, self.id, ) .await?; let reply_fut = reply_fut.instrument(info_span!( "rebaser_client.await_response", si.workspace.id = %workspace_id, si.change_set.id = %base_change_set_id, )); // Wait on response from Rebaser after request has processed let timeout = Duration::from_secs(60); let _reply = time::timeout(timeout, reply_fut) .await .map_err(|_elapsed| { TransactionsError::RebaserReplyDeadlineElasped(timeout, request_id) })??; } self.update_status(ctx, ChangeSetStatus::Applied).await?; let user = Self::extract_userid_from_context(ctx).await; WsEvent::change_set_applied(ctx, self.id, base_change_set_id, user) .await? .publish_on_commit(ctx) .await?; Ok(()) } /// Returns a new [`ChangeSetId`](ChangeSet) if a new [`ChangeSet`] was created. /// Also writes an audit log event to head (so we don't have to handle this in every route handler) pub async fn force_new(ctx: &mut DalContext) -> ChangeSetResult<Option<ChangeSetId>> { let maybe_fake_pk = if ctx.change_set_id() == ctx.get_workspace_default_change_set_id().await? { let change_set = Self::fork_head(ctx, Self::generate_name()).await?; ctx.update_visibility_and_snapshot_to_visibility(change_set.id) .await?; ctx.write_audit_log_to_head(AuditLogKind::CreateChangeSet, change_set.name) .await?; WsEvent::change_set_created(ctx, change_set.id, change_set.workspace_snapshot_address) .await? .publish_on_commit(ctx) .await?; Some(change_set.id) } else { None }; Ok(maybe_fake_pk) } fn generate_name() -> String { Utc::now().format("%Y-%m-%d-%H:%M").to_string() } /// Updates the status for a ChangeSet to be [`ChangeSetStatus::Abandoned`] and fires necessary WSEvent pub async fn abandon(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { self.update_status(ctx, ChangeSetStatus::Abandoned).await?; let user_id = Self::extract_userid_from_context(ctx).await; WsEvent::change_set_abandoned(ctx, self.id, user_id) .await? .publish_on_commit(ctx) .await?; Ok(()) } pub async fn extract_userid_from_context(ctx: &DalContext) -> Option<UserPk> { match ctx.history_actor() { HistoryActor::User(user_pk) => { let maybe_user = User::get_by_pk_opt(ctx, *user_pk).await; match maybe_user { Ok(user_option) => user_option.map(|user| user.pk()), Err(_) => None, } } HistoryActor::SystemInit => None, } } pub async fn extract_userid_from_context_or_error(ctx: &DalContext) -> ChangeSetResult<UserPk> { let user_id = match ctx.history_actor() { HistoryActor::User(user_pk) => User::get_by_pk(ctx, *user_pk).await?.pk(), HistoryActor::SystemInit => return Err(ChangeSetError::InvalidUserSystemInit), }; Ok(user_id) } /// Walk the graph of change sets up to the change set that has no "base /// change set id" and return the set. pub async fn ancestors( ctx: &DalContext, change_set_id: ChangeSetId, ) -> ChangeSetResult<HashSet<ChangeSetId>> { let mut result = HashSet::new(); let rows = ctx .txns() .await? .pg() .query(FIND_ANCESTORS_QUERY, &[&change_set_id]) .await?; for row in rows { let id: String = row.get("id"); result.insert(ChangeSetId::from_str(&id)?); } Ok(result) } pub async fn rename_change_set( ctx: &DalContext, change_set_id: ChangeSetId, new_name: &String, ) -> ChangeSetResult<()> { let default_change_set_id = ctx.get_workspace_default_change_set_id().await?; if default_change_set_id == change_set_id { return Err(ChangeSetError::CantRenameHeadChangeSet); } ctx.txns() .await? .pg() .query_none( "UPDATE change_set_pointers SET name = $2, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", &[&change_set_id, new_name], ) .await?; WsEvent::rename_change_set(ctx, change_set_id, new_name.clone()) .await? .publish_on_commit(ctx) .await?; Ok(()) } /// Wait for the changeset's DVUs to be completely processed before continuing /// This func also runs update_snapshot_to_visibility on the passed in ctx /// with_timeout makes the function fail after a minute, since that's also the rebaser timeout. /// On prod code, always pass `true`. `false` is for testing only. pub async fn wait_for_dvu(ctx: &mut DalContext, with_timeout: bool) -> ChangeSetResult<()> { let mut retry_count = 0; const AWAIT_MS: u64 = 25; const MAX_RETRIES: u64 = (60 * 1000) / AWAIT_MS; loop { ctx.update_snapshot_to_visibility().await?; if !DependentValueRoot::roots_exist(ctx).await? { break; } tokio::time::sleep(Duration::from_millis(AWAIT_MS)).await; if with_timeout { retry_count += 1; if retry_count > MAX_RETRIES { return Err(ChangeSetError::TimedOutAwaitingDvu(AWAIT_MS * retry_count)); } } } Ok(()) } } impl std::fmt::Debug for ChangeSet { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChangeSet") .field("id", &self.id.to_string()) .field( "base_change_set_id", &self.base_change_set_id.map(|bcsid| bcsid.to_string()), ) .field( "workspace_snapshot_address", &self.workspace_snapshot_address.to_string(), ) .field( "merge_requested_by_user_id", &self .merge_requested_by_user_id .map(|user_pk| user_pk.to_string()), ) .finish() } } /// Calculates the checksum based on a list of IDs with hashes passed in. #[instrument(name = "calculate_checksum", level = "debug", skip_all)] pub async fn calculate_checksum( ctx: &DalContext, mut ids_with_hashes: Vec<(EntityId, MerkleTreeHash)>, ) -> ChangeSetResult<Checksum> { // If an empty list of IDs with hashes wass passed in, then we use the root node's ID and // merkle tree hash as our sole ID and hash so that algorithms using the checksum can // "invalidate" as needed. if ids_with_hashes.is_empty() { let root_node_id = ctx .workspace_snapshot() .map_err(Box::new)? .root() .await .map_err(Box::new)?; let root_node = ctx .workspace_snapshot() .map_err(Box::new)? .get_node_weight(root_node_id) .await .map_err(Box::new)?; ids_with_hashes.push((root_node_id.into(), root_node.merkle_tree_hash())); } // We MUST sort IDs (not hashes) before creating the checksum. This is so that we have // stable checksum calculation. ids_with_hashes.sort_by_key(|(id, _)| *id); // Now that we have strictly ordered IDs with hasesh and there's at least one group // present, we can create the checksum. let mut hasher = Checksum::hasher(); for (_, hash) in ids_with_hashes { hasher.update(hash.as_bytes()); } Ok(hasher.finalize()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server