Skip to main content
Glama
action.rs33.1 kB
use std::{ collections::{ HashSet, VecDeque, }, time::Instant, }; use itertools::Itertools; use petgraph::prelude::*; use postgres_types::{ FromSql, ToSql, }; use serde::{ Deserialize, Serialize, }; use si_events::{ audit_log::AuditLogKind, ulid::Ulid, }; use si_id::{ FuncRunId, SchemaVariantId, }; use si_layer_cache::LayerDbError; use strum::{ AsRefStr, Display, EnumIter, EnumString, }; use telemetry::prelude::*; use thiserror::Error; use crate::{ ChangeSetError, ChangeSetId, Component, ComponentError, ComponentId, DalContext, EdgeWeightKind, EdgeWeightKindDiscriminants, Func, FuncError, HelperError, SchemaVariantError, TransactionsError, WorkspaceSnapshotError, WsEvent, WsEventError, WsEventResult, WsPayload, action::{ dependency_graph::ActionDependencyGraph, prototype::{ ActionKind, ActionPrototype, ActionPrototypeError, }, }, attribute::value::{ AttributeValueError, DependentValueGraph, }, func::FuncExecutionPk, implement_add_edge_to, workspace_snapshot::{ DependentValueRoot, dependent_value_root::DependentValueRootError, node_weight::{ ActionNodeWeight, NodeWeight, NodeWeightError, category_node_weight::CategoryNodeKind, }, }, }; pub mod dependency_graph; pub mod prototype; #[remain::sorted] #[derive(Debug, Error)] pub enum ActionError { #[error("action already enqueued: {0}")] ActionAlreadyEnqueued(ActionPrototypeId), #[error("action prototype error: {0}")] ActionPrototype(#[from] Box<ActionPrototypeError>), #[error("attribute prototype error: {0}")] AttributePrototype(#[from] Box<crate::attribute::prototype::AttributePrototypeError>), #[error("attribute prototype argument error: {0}")] AttributePrototypeArgument( #[from] Box<crate::attribute::prototype::argument::AttributePrototypeArgumentError>, ), #[error("AttributeValue error: {0}")] AttributeValue(#[from] Box<AttributeValueError>), #[error("Change Set error: {0}")] ChangeSet(#[from] Box<ChangeSetError>), #[error("Component error: {0}")] Component(#[from] Box<ComponentError>), #[error("component not found for action: {0}")] ComponentNotFoundForAction(ActionId), #[error("dependent value root error: {0}")] DependentValueRoot(#[from] Box<DependentValueRootError>), #[error("func error: {0}")] Func(#[from] Box<FuncError>), #[error("Helper error: {0}")] Helper(#[from] Box<HelperError>), #[error("Layer DB error: {0}")] LayerDb(#[from] LayerDbError), #[error("Node Weight error: {0}")] NodeWeight(#[from] Box<NodeWeightError>), #[error("prototype not found for action: {0}")] PrototypeNotFoundForAction(ActionId), #[error("Schema Variant error: {0}")] SchemaVariant(#[from] SchemaVariantError), #[error("Transactions error: {0}")] Transactions(#[from] Box<TransactionsError>), #[error("Unable to determine kind for action: {0}")] UnableToGetKind(ActionId), #[error("unexpected number of action kind {0} for variant {1}")] UnexpectedNumberOfActionKinds(ActionKind, SchemaVariantId), #[error("unexpected number of {0} actions enqueued for component {1}")] UnexpectedNumberOfActionsEnqueuedForComponent(ActionKind, ComponentId), #[error("Workspace Snapshot error: {0}")] WorkspaceSnapshot(#[from] Box<WorkspaceSnapshotError>), #[error("ws event error: {0}")] WsEvent(#[from] Box<WsEventError>), } impl From<ActionPrototypeError> for ActionError { fn from(value: ActionPrototypeError) -> Self { Box::new(value).into() } } impl From<crate::attribute::prototype::AttributePrototypeError> for ActionError { fn from(value: crate::attribute::prototype::AttributePrototypeError) -> Self { Box::new(value).into() } } impl From<crate::attribute::prototype::argument::AttributePrototypeArgumentError> for ActionError { fn from(value: crate::attribute::prototype::argument::AttributePrototypeArgumentError) -> Self { Box::new(value).into() } } impl From<AttributeValueError> for ActionError { fn from(value: AttributeValueError) -> Self { Box::new(value).into() } } impl From<ChangeSetError> for ActionError { fn from(value: ChangeSetError) -> Self { Box::new(value).into() } } impl From<ComponentError> for ActionError { fn from(value: ComponentError) -> Self { Box::new(value).into() } } impl From<DependentValueRootError> for ActionError { fn from(value: DependentValueRootError) -> Self { Box::new(value).into() } } impl From<FuncError> for ActionError { fn from(value: FuncError) -> Self { Box::new(value).into() } } impl From<HelperError> for ActionError { fn from(value: HelperError) -> Self { Box::new(value).into() } } impl From<NodeWeightError> for ActionError { fn from(value: NodeWeightError) -> Self { Box::new(value).into() } } impl From<TransactionsError> for ActionError { fn from(value: TransactionsError) -> Self { Box::new(value).into() } } impl From<WorkspaceSnapshotError> for ActionError { fn from(value: WorkspaceSnapshotError) -> Self { Box::new(value).into() } } impl From<WsEventError> for ActionError { fn from(value: WsEventError) -> Self { Box::new(value).into() } } pub type ActionResult<T> = Result<T, ActionError>; pub use si_events::ActionState; pub use si_id::{ ActionId, ActionPrototypeId, }; /// The completion status of a [`ActionRunner`] /// /// NOTE: This type is only here for backwards comppatibility /// TODO(fnichol): delete this when it's time #[remain::sorted] #[derive( Deserialize, Serialize, AsRefStr, Display, EnumIter, EnumString, Debug, Clone, Copy, PartialEq, Eq, ToSql, FromSql, )] #[serde(rename_all = "camelCase")] #[strum(serialize_all = "camelCase")] pub enum ActionCompletionStatus { Error, Failure, Success, Unstarted, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] pub struct Action { id: ActionId, state: ActionState, originating_changeset_id: ChangeSetId, // DEPRECATED func_execution_pk: Option<FuncExecutionPk>, } impl From<ActionNodeWeight> for Action { fn from(value: ActionNodeWeight) -> Self { Self { id: value.id().into(), state: value.state(), originating_changeset_id: value.originating_change_set_id(), func_execution_pk: None, } } } impl Action { pub fn id(&self) -> ActionId { self.id } pub fn state(&self) -> ActionState { self.state } pub fn originating_changeset_id(&self) -> ChangeSetId { self.originating_changeset_id } implement_add_edge_to!( source_id: ActionId, destination_id: ComponentId, add_fn: add_edge_to_component, discriminant: EdgeWeightKindDiscriminants::Use, result: ActionResult, ); implement_add_edge_to!( source_id: ActionId, destination_id: ActionPrototypeId, add_fn: add_edge_to_action_prototype, discriminant: EdgeWeightKindDiscriminants::Use, result: ActionResult, ); // Even though we're using `implement_add_edge_to`, we're not creating an edge from Self *TO* // the Category node. We're adding an edge *FROM* the Category node to Self. implement_add_edge_to!( source_id: Ulid, destination_id: ActionId, add_fn: add_incoming_category_edge, discriminant: EdgeWeightKindDiscriminants::Use, result: ActionResult, ); /// Returns whether or not any Actions were dispatched. pub async fn dispatch_actions( ctx: &DalContext, concurrency_max: Option<usize>, ) -> ActionResult<bool> { let span = current_span_for_instrument_at!("info"); let concurrency_max = concurrency_max.unwrap_or(usize::MAX); let mut actions_dispatched = 0; let mut did_dispatch = false; // Only dispatch actions if capacity exists. Too many actions running at // once can overwhelm the rebaser for the HEAD change set. let dispatched_count = Action::dispatched_count(ctx).await?; if dispatched_count >= concurrency_max { return Ok(false); } for dispatchable_ation_id in Action::eligible_to_dispatch(ctx).await? { Action::dispatch_action(ctx, dispatchable_ation_id).await?; did_dispatch = true; actions_dispatched += 1; if actions_dispatched + dispatched_count >= concurrency_max { break; } } span.record("si.rebase.actions_dispatched", actions_dispatched); span.record( "si.rebase.dispatched_actions_total", actions_dispatched + dispatched_count, ); Ok(did_dispatch) } pub async fn find_for_component_id( ctx: &DalContext, component_id: ComponentId, ) -> ActionResult<Vec<ActionId>> { let mut actions = vec![]; let snap = ctx.workspace_snapshot()?; let action_category_id = snap .get_category_node_or_err(CategoryNodeKind::Action) .await?; for action_idx in snap .outgoing_targets_for_edge_weight_kind( action_category_id, EdgeWeightKindDiscriminants::Use, ) .await? { let action_id: ActionId = snap .get_node_weight(action_idx) .await? .get_action_node_weight()? .id() .into(); if Self::component_id(ctx, action_id).await? == Some(component_id) { actions.push(action_id); } } Ok(actions) } #[instrument(level = "info", skip_all)] pub async fn remove_all_for_component_id( ctx: &DalContext, component_id: ComponentId, ) -> ActionResult<()> { let actions_to_remove = Self::find_for_component_id(ctx, component_id).await?; for action_id in actions_to_remove { Self::remove_by_id(ctx, action_id).await?; } Ok(()) } pub async fn find_for_states_and_component_id( ctx: &DalContext, component_id: ComponentId, action_states: Vec<ActionState>, ) -> ActionResult<Vec<ActionId>> { let mut actions = vec![]; let actions_for_component = Self::find_for_component_id(ctx, component_id).await?; for action_id in actions_for_component { let action = Self::get_by_id(ctx, action_id).await?; if action_states.contains(&action.state()) { actions.push(action_id); } } Ok(actions) } pub async fn find_for_kind_and_component_id( ctx: &DalContext, component_id: ComponentId, action_kind: ActionKind, ) -> ActionResult<Vec<ActionId>> { let actions_for_component = Self::find_for_component_id(ctx, component_id).await?; let mut actions = vec![]; for action_id in actions_for_component { let action_prototype_id = Self::prototype_id(ctx, action_id).await?; let action_prototype = ActionPrototype::get_by_id(ctx, action_prototype_id).await?; if action_prototype.kind == action_kind { actions.push(action_id); } } Ok(actions) } pub async fn find_equivalent( ctx: &DalContext, action_prototype_id: ActionPrototypeId, maybe_component_id: Option<ComponentId>, ) -> ActionResult<Option<ActionId>> { let snap = ctx.workspace_snapshot()?; let action_category_id = snap .get_category_node_or_err(CategoryNodeKind::Action) .await?; for action_idx in snap .outgoing_targets_for_edge_weight_kind( action_category_id, EdgeWeightKindDiscriminants::Use, ) .await? { let action_id: ActionId = snap .get_node_weight(action_idx) .await? .get_action_node_weight()? .id() .into(); if Self::component_id(ctx, action_id).await? == maybe_component_id && Self::prototype_id(ctx, action_id).await? == action_prototype_id { // we found the equivalent! return Ok(Some(action_id)); } } Ok(None) } pub async fn get_by_id(ctx: &DalContext, id: ActionId) -> ActionResult<Self> { let action: Self = ctx .workspace_snapshot()? .get_node_weight(id) .await? .get_action_node_weight()? .into(); Ok(action) } #[instrument(level = "info", skip_all, fields(si.action.id = ?id, si.action.state = ?state))] pub async fn set_state(ctx: &DalContext, id: ActionId, state: ActionState) -> ActionResult<()> { let node_weight = ctx .workspace_snapshot()? .get_node_weight(id) .await? .get_action_node_weight()?; let mut new_node_weight = node_weight.clone(); new_node_weight.set_state(state); ctx.workspace_snapshot()? .add_or_replace_node(NodeWeight::Action(new_node_weight)) .await?; Ok(()) } #[deprecated(note = "no longer tracking this")] pub async fn set_func_execution_pk( _ctx: &DalContext, _id: ActionId, _pk: Option<FuncExecutionPk>, ) -> ActionResult<()> { unimplemented!("You should never be setting func_execution_pk; bug!"); } /// Enqueues a new action and publishes a WSEvent #[instrument(level = "info", skip(ctx))] pub async fn new( ctx: &DalContext, action_prototype_id: ActionPrototypeId, maybe_component_id: Option<ComponentId>, ) -> ActionResult<Self> { let new_id: ActionId = ctx.workspace_snapshot()?.generate_ulid().await?.into(); let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?; let originating_change_set_id = ctx.change_set_id(); let node_weight = NodeWeight::new_action(originating_change_set_id, new_id.into(), lineage_id); ctx.workspace_snapshot()? .add_or_replace_node(node_weight) .await?; let action_category_id = ctx .workspace_snapshot()? .get_category_node_or_err(CategoryNodeKind::Action) .await?; Self::add_incoming_category_edge( ctx, action_category_id, new_id, EdgeWeightKind::new_use(), ) .await?; Self::add_edge_to_action_prototype( ctx, new_id, action_prototype_id, EdgeWeightKind::new_use(), ) .await?; if let Some(component_id) = maybe_component_id { Self::add_edge_to_component(ctx, new_id, component_id, EdgeWeightKind::new_use()) .await?; } let new_action: Self = ctx .workspace_snapshot()? .get_node_weight(new_id) .await? .get_action_node_weight()? .into(); WsEvent::action_list_updated(ctx) .await? .publish_on_commit(ctx) .await?; Ok(new_action) } pub async fn remove_by_id(ctx: &DalContext, action_id: ActionId) -> ActionResult<()> { ctx.workspace_snapshot()? .remove_node_by_id(action_id) .await?; Ok(()) } pub async fn remove_by_prototype_and_component( ctx: &DalContext, action_prototype_id: ActionPrototypeId, maybe_component_id: Option<ComponentId>, ) -> ActionResult<()> { let snap = ctx.workspace_snapshot()?; if let Some(action_id) = Self::find_equivalent(ctx, action_prototype_id, maybe_component_id).await? { snap.remove_node_by_id(action_id).await?; } Ok(()) } /// Sort the dependency graph of [`Actions`][Action] topologically, breaking ties by listing /// [`Actions`][Action] sorted by their ID (oldest first thanks to ULID sorting). #[instrument(level = "debug", skip_all)] pub async fn list_topologically(ctx: &DalContext) -> ActionResult<Vec<ActionId>> { let mut action_dependency_graph = ActionDependencyGraph::for_workspace(ctx).await?; let mut result = Vec::with_capacity(action_dependency_graph.remaining_actions().len()); // TODO: Grab all "running" & "failed" Actions to list first? loop { let mut independent_actions = action_dependency_graph.independent_actions(); if independent_actions.is_empty() { break; } independent_actions.sort(); result.reserve(independent_actions.len()); for action_id in independent_actions { action_dependency_graph.remove_action(action_id); result.push(action_id); } } // If there is a cycle in the dependencies for some reason, we still want to know that the // actions exist, even though they'll never start executing. let mut actions_in_cycle = action_dependency_graph.remaining_actions(); actions_in_cycle.sort(); result.append(&mut actions_in_cycle); Ok(result) } pub async fn prototype_id( ctx: &DalContext, action_id: ActionId, ) -> ActionResult<ActionPrototypeId> { for (_, _tail_node_idx, head_node_idx) in ctx .workspace_snapshot()? .edges_directed_for_edge_weight_kind( action_id, Outgoing, EdgeWeightKindDiscriminants::Use, ) .await? { if let NodeWeight::ActionPrototype(node_weight) = ctx .workspace_snapshot()? .get_node_weight(head_node_idx) .await? { return Ok(node_weight.id().into()); } } Err(ActionError::PrototypeNotFoundForAction(action_id)) } pub async fn prototype(ctx: &DalContext, action_id: ActionId) -> ActionResult<ActionPrototype> { let prototype_id = Self::prototype_id(ctx, action_id).await?; Ok(ActionPrototype::get_by_id(ctx, prototype_id).await?) } pub async fn component_id( ctx: &DalContext, action_id: ActionId, ) -> ActionResult<Option<ComponentId>> { for (_, _tail_node_idx, head_node_idx) in ctx .workspace_snapshot()? .edges_directed_for_edge_weight_kind( action_id, Outgoing, EdgeWeightKindDiscriminants::Use, ) .await? { if let NodeWeight::Component(component_node_weight) = ctx .workspace_snapshot()? .get_node_weight(head_node_idx) .await? { return Ok(Some(component_node_weight.id().into())); } } Ok(None) } pub async fn component( ctx: &DalContext, action_id: ActionId, ) -> ActionResult<Option<Component>> { match Self::component_id(ctx, action_id).await? { Some(component_id) => Ok(Some(Component::get_by_id(ctx, component_id).await?)), None => Ok(None), } } /// Returns the number of dispatched (or running) actions. pub async fn dispatched_count(ctx: &DalContext) -> ActionResult<usize> { let mut count = 0; let action_category_node_index = ctx .workspace_snapshot()? .get_category_node_or_err(CategoryNodeKind::Action) .await?; let action_edges = ctx .workspace_snapshot()? .edges_directed(action_category_node_index, Outgoing) .await?; for (_, _, action_id) in action_edges { let action = ctx.workspace_snapshot()?.get_node_weight(action_id).await?; if action.get_action_node_weight()?.is_dispatched_or_running() { count += 1; } } Ok(count) } pub async fn all_ids(ctx: &DalContext) -> ActionResult<Vec<ActionId>> { let action_category_node_index = ctx .workspace_snapshot()? .get_category_node_or_err(CategoryNodeKind::Action) .await?; let action_edges = ctx .workspace_snapshot()? .edges_directed(action_category_node_index, Outgoing) .await?; let mut result = Vec::with_capacity(action_edges.len()); for (_, _, action_id) in action_edges { result.push(action_id.into()); } Ok(result) } /// For a given [ActionId] and [ActionDependencyGraph], determine which (if any) actions /// are influencing the current action's hold status. /// For example, the given Action might be queued, but if it's dependent on an action that is failed or on /// hold, we will not dispatch the action #[instrument( name = "action.get_hold_status_influenced_by", level = "debug", skip(ctx, action_dependency_graph) )] pub async fn get_hold_status_influenced_by( ctx: &DalContext, action_dependency_graph: &ActionDependencyGraph, for_action_id: ActionId, ) -> ActionResult<Vec<ActionId>> { let mut reasons_for_hold = vec![]; let mut seen_list = HashSet::new(); let mut work_queue = VecDeque::from(action_dependency_graph.direct_dependencies_of(for_action_id)); while let Some(action_id) = work_queue.pop_front() { let act = Self::get_by_id(ctx, action_id).await?; match act.state() { ActionState::Failed | ActionState::OnHold => reasons_for_hold.push(act.id()), _ => (), } seen_list.insert(action_id); for direct_dependency in action_dependency_graph.direct_dependencies_of(action_id) { if !seen_list.contains(&direct_dependency) { work_queue.push_back(direct_dependency); } } } Ok(reasons_for_hold) } /// An Action is dispatchable if all of the following are true: /// * The action is in the state [`ActionState::Queued`](ActionState) /// * The graph of values for `DependentValuesUpdate` does *NOT* include /// *ANY* [`AttributeValue`s](AttributeValue) for the same /// [`Component`](crate::Component) as the [`Action`]. /// /// This method **DOES NOT** check the `DependentValuesUpdate` graph. That /// is done as part of [`Self::eligible_to_dispatch()`] pub fn is_eligible_to_dispatch(&self) -> bool { // Only Actions in the ActionState::Queued state are dispatchable. self.state() == ActionState::Queued } /// An Action is dispatchable if all of the following are true: /// * The action is in the state [`ActionState::Queued`](ActionState) /// * The graph of values for `DependentValuesUpdate` does *NOT* include /// *ANY* [`AttributeValue`s](AttributeValue) for the same /// [`Component`](crate::Component) as the [`Action`]. pub async fn eligible_to_dispatch(ctx: &DalContext) -> ActionResult<Vec<ActionId>> { let span = current_span_for_instrument_at!("info"); let start = Instant::now(); let action_dependency_graph = ActionDependencyGraph::for_workspace(ctx).await?; span.record( "si.rebase.action_dependency_graph_time", start.elapsed().as_millis(), ); let mut result = Vec::with_capacity(action_dependency_graph.remaining_actions().len()); let dependent_value_graph = DependentValueGraph::new( ctx, DependentValueRoot::get_dependent_value_roots(ctx).await?, ) .await?; span.record( "si.rebase.dependent_value_graph_time", start.elapsed().as_millis(), ); // Find the ComponentIds for all AttributeValues in the full dependency graph for the // queued/running DependentValuesUpdate. We'll want to hold off on dispatching any Actions // that would be operating on the same Component until after the DependentValuesUpdate has // finished working with that Component. let mut dvu_component_ids: HashSet<ComponentId> = HashSet::new(); for value in dependent_value_graph.all_value_ids() { if let Some(component_id) = dependent_value_graph.cached_component_id_for_value(value) { dvu_component_ids.insert(component_id); } } for possible_action_id in action_dependency_graph.independent_actions() { let action = Action::get_by_id(ctx, possible_action_id).await?; if action.is_eligible_to_dispatch() { if let Some(action_component_id) = Action::component_id(ctx, action.id()).await? { if dvu_component_ids.contains(&action_component_id) { // This action is for a Component that currently involved in the queued // DependentValuesUpdate graph. We don't want to dispatch the Action until // the DependentValuesUpdate job has completely finished // processing/populating values that the Action might need to work with. continue; } } result.push(possible_action_id); } } Ok(result) } #[instrument(name = "workspace_snapshot.dispatch_action", level = "info", skip_all, fields( si.action.id = ?action_id, ))] pub async fn dispatch_action(ctx: &DalContext, action_id: ActionId) -> ActionResult<()> { Action::set_state(ctx, action_id, ActionState::Dispatched).await?; ctx.enqueue_action_job(ctx.workspace_pk()?, ctx.change_set_id(), action_id) .await?; Ok(()) } pub async fn last_func_run_id_for_id_opt( ctx: &DalContext, id: ActionId, ) -> ActionResult<Option<FuncRunId>> { Ok(ctx .layer_db() .func_run() .get_last_run_for_action_id_opt(ctx.events_tenancy().workspace_pk, id) .await? .map(|f| f.id())) } /// This function behaves differently if on head vs. in an open change set. /// For a given component, if we're on head already, we'll simply enqueue the refresh action as normal /// If we're not on head, we check to see if the component exists on head /// If it does, we enqueue the refresh function on head (and when it runs, it will be replayed) /// as we do not want a change set to have a more up-to-date perspective than head /// If it does NOT exist on head, we will dispatch the refresh action directly! pub async fn enqueue_refresh_in_correct_change_set_and_commit( ctx: &DalContext, component_id: ComponentId, ) -> ActionResult<()> { let head_change_set_id = ctx.get_workspace_default_change_set_id().await?; // if we're already on head, just enqueue it if ctx.change_set_id() == head_change_set_id { Self::enqueue_refresh(ctx, component_id, false).await } else { // Not on head, so build the head dal ctx let head_ctx = ctx.clone_with_head().await?; // if the component exists on head, enqueue the refresh action there if Component::exists_by_id(&head_ctx, component_id).await? { Self::enqueue_refresh(&head_ctx, component_id, false).await } // otherwise, if the component doesn't have a resource, just enqueue it // (this is a backend guard, the button will be hidden from the user) else if Component::resource_by_id(ctx, component_id) .await? .is_none() { Self::enqueue_refresh(ctx, component_id, false).await } // last case, component doesn't exist on head, but has a resource (which can only be true if we ran an import or other mgmt func) // so we can enqueue and dispatch right away else { Self::enqueue_refresh(ctx, component_id, true).await } } } async fn enqueue_refresh( ctx: &DalContext, component_id: ComponentId, should_dispatch: bool, ) -> ActionResult<()> { let schema_variant_id = Component::schema_variant_id(ctx, component_id).await?; let refresh_actions = ActionPrototype::find_by_kind_for_schema_or_variant( ctx, ActionKind::Refresh, schema_variant_id, ) .await?; if let Ok(prototype) = refresh_actions.iter().exactly_one().map_err(|_| { ActionError::UnexpectedNumberOfActionKinds(ActionKind::Refresh, schema_variant_id) }) { let maybe_duplicate_action = Action::find_for_kind_and_component_id(ctx, component_id, ActionKind::Refresh) .await?; // See if there's an existing Refresh Action (single) for this component if let Some(&action_id) = maybe_duplicate_action.iter().at_most_one().map_err(|_| { ActionError::UnexpectedNumberOfActionsEnqueuedForComponent( ActionKind::Refresh, component_id, ) })? { if should_dispatch { // If we're dispatching and there's already an action enqueued, and the originating change set // is this change set, dispatch it! Otherwise, create a new action and dispatch it. let action = Action::get_by_id(ctx, action_id).await?; if action.originating_changeset_id() == ctx.change_set_id() { Action::dispatch_action(ctx, action_id).await?; } else { let new_action_id = Self::enqueue_new_refresh(ctx, prototype.id(), component_id).await?; Action::dispatch_action(ctx, new_action_id).await?; } } else { // Not dispatching - re-enqueue the existing action let action = Action::get_by_id(ctx, action_id).await?; match action.state() { ActionState::Failed | ActionState::OnHold => { Action::set_state(ctx, action_id, ActionState::Queued).await?; } ActionState::Dispatched | ActionState::Queued | ActionState::Running => { // no op if the action is already queued/dispatched/running return Ok(()); } } } } else { // No duplicate actions - create a new one and optionally dispatch let action_id = Self::enqueue_new_refresh(ctx, prototype.id(), component_id).await?; if should_dispatch { Action::dispatch_action(ctx, action_id).await?; } } ctx.commit().await?; }; Ok(()) } async fn enqueue_new_refresh( ctx: &DalContext, prototype_id: ActionPrototypeId, component_id: ComponentId, ) -> ActionResult<ActionId> { let func_id = ActionPrototype::func_id(ctx, prototype_id).await?; let func = Func::get_by_id(ctx, func_id).await?; let action = Action::new(ctx, prototype_id, Some(component_id)).await?; ctx.write_audit_log( AuditLogKind::AddAction { prototype_id, action_kind: si_events::ActionKind::Refresh, func_id, func_display_name: func.display_name, func_name: func.name.clone(), component_id: Some(component_id), }, func.name, ) .await?; Ok(action.id()) } } impl WsEvent { pub async fn action_list_updated(ctx: &DalContext) -> WsEventResult<Self> { WsEvent::new(ctx, WsPayload::ActionsListUpdated(ctx.change_set_id())).await } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server