Skip to main content
Glama
action.rs11.8 kB
use std::collections::{ HashMap, VecDeque, }; use async_trait::async_trait; use pinga_core::api_types::job_execution_request::JobArgsVCurrent; use serde::{ Deserialize, Serialize, }; use si_events::{ ActionResultState, FuncRunId, audit_log::AuditLogKind, }; use si_id::{ ChangeSetId, WorkspacePk, }; use telemetry::prelude::*; use telemetry_utils::metric; use veritech_client::{ ActionRunResultSuccess, ResourceStatus, }; use crate::{ ActionPrototypeId, Component, ComponentId, DalContext, Func, WsEvent, action::{ Action, ActionError, ActionId, ActionState, prototype::{ ActionKind, ActionPrototype, }, }, billing_publish, change_status::ChangeStatus, func::runner::FuncRunner, job::consumer::{ DalJob, JobCompletionState, JobConsumer, JobConsumerResult, }, }; #[derive(Debug, Deserialize, Serialize)] struct ActionJobArgs { id: ActionId, } impl From<ActionJob> for ActionJobArgs { fn from(value: ActionJob) -> Self { Self { id: value.action_id, } } } #[derive(Clone, Debug, Serialize)] pub struct ActionJob { workspace_id: WorkspacePk, change_set_id: ChangeSetId, action_id: ActionId, } impl ActionJob { pub fn new( workspace_id: WorkspacePk, change_set_id: ChangeSetId, action_id: ActionId, ) -> Box<Self> { Box::new(Self { workspace_id, change_set_id, action_id, }) } } impl DalJob for ActionJob { fn args(&self) -> JobArgsVCurrent { JobArgsVCurrent::Action { action_id: self.action_id, } } fn workspace_id(&self) -> WorkspacePk { self.workspace_id } fn change_set_id(&self) -> ChangeSetId { self.change_set_id } } #[async_trait] impl JobConsumer for ActionJob { #[instrument( name = "action_job.run", skip_all, level = "info", fields( si.action.id = ?self.action_id, // TODO: determine what this field is called for retries si.poopadoop.retries = Empty, ) )] async fn run(&self, ctx: &mut DalContext) -> JobConsumerResult<JobCompletionState> { metric!(counter.action_concurrency_count = 1); if let Err(err) = inner_run(ctx, self.action_id).await { error!(si.error.message = ?err, si.action.id = %self.action_id, "unable to finish action"); if let Err(err) = process_failed_action(ctx, self.action_id).await { error!(si.error.message = ?err, "failed to process action failure"); } } metric!(counter.action_concurrency_count = -1); Ok(JobCompletionState::Done) } } #[instrument( name = "action_job.action_task", skip_all, level = "info", fields( si.action.id = ?action_id, si.action.kind = Empty, si.component.id = Empty, // TODO: determine what this field is called for retries si.action_job.process.retries = Empty, ) )] async fn inner_run( ctx: &mut DalContext, action_id: ActionId, ) -> JobConsumerResult<Option<ActionRunResultSuccess>> { let (prototype_id, component_id) = prepare_for_execution(ctx, action_id).await?; // Execute the action function let (maybe_resource, func_run_id) = ActionPrototype::run(ctx, prototype_id, component_id).await?; // process the result process_execution_result(ctx, maybe_resource.as_ref(), action_id, func_run_id).await?; // conditionally perform any graph cleanups perform_graph_cleanups(ctx, prototype_id).await?; Ok(maybe_resource) } // Compute inputs for the action prototype run and backfill span attributes. async fn prepare_for_execution( ctx: &mut DalContext, action_id: ActionId, ) -> JobConsumerResult<(ActionPrototypeId, ComponentId)> { let span = Span::current(); let component_id = Action::component_id(ctx, action_id) .await? .ok_or(ActionError::ComponentNotFoundForAction(action_id))?; let prototype_id = Action::prototype_id(ctx, action_id).await?; let prototype = ActionPrototype::get_by_id(ctx, prototype_id).await?; span.record("si.action.kind", tracing::field::debug(&prototype.kind)); span.record("si.component.id", tracing::field::debug(&component_id)); let component_id = Action::component_id(ctx, action_id) .await? .ok_or(ActionError::ComponentNotFoundForAction(action_id))?; Ok((prototype_id, component_id)) } #[instrument( name = "action_job.process_execution", level = "info", skip_all, fields( si.action.id = ?action_id ), )] async fn process_execution_result( ctx: &mut DalContext, action_run_result: Option<&ActionRunResultSuccess>, action_id: ActionId, func_run_id: FuncRunId, ) -> JobConsumerResult<()> { let prototype_id = Action::prototype_id(ctx, action_id).await?; let prototype = ActionPrototype::get_by_id(ctx, prototype_id).await?; let func_id = ActionPrototype::func_id(ctx, prototype_id).await?; let func = Func::get_by_id(ctx, func_id).await?; let component_id = Action::component_id(ctx, action_id) .await? .ok_or(ActionError::ComponentNotFoundForAction(action_id))?; let component = Component::get_by_id(ctx, component_id).await?; let mut success = false; if let Some(run_result) = action_run_result { // Set the resource if we have a payload, regardless of status *and* assemble a // summary if run_result.payload.is_some() { // Send the create resource event if we're not updating an existing resource if component.resource(ctx).await?.is_none() { billing_publish::for_resource_create(ctx, component_id, func_run_id).await?; } component.set_resource(ctx, run_result.into()).await?; } // Set the resource id if we have one, even on failure. (although, why?) if let Some(resource_id) = &run_result.resource_id { component.set_resource_id(ctx, resource_id.as_str()).await?; } if run_result.status == ResourceStatus::Ok { success = true; // We have to call this before we remove the action from the graph let prototype_to_trigger = ActionPrototype::get_prototypes_to_trigger(ctx, action_id, prototype.id()).await?; // Remove `ActionId` from graph as the execution succeeded Action::remove_by_id(ctx, action_id).await?; // Clear the resource if the status is ok and we don't have a payload. This could // be from invoking a delete action directly, rather than deleting the component. if run_result.payload.is_none() { // Send the delete resource event if there is a resource to actually clear if component.resource(ctx).await?.is_some() { billing_publish::for_resource_delete(ctx, component_id, func_run_id).await?; } component.clear_resource(ctx).await?; if component.to_delete() { Component::remove(ctx, component.id()).await?; WsEvent::component_deleted(ctx, component.id()) .await? .publish_on_commit(ctx) .await?; } else { let mut diagram_sockets = HashMap::new(); let summary = component .into_frontend_type( ctx, None, ChangeStatus::Unmodified, &mut diagram_sockets, ) .await?; WsEvent::resource_refreshed(ctx, summary) .await? .publish_on_commit(ctx) .await?; } } else { let mut diagram_sockets = HashMap::new(); let summary = component .into_frontend_type(ctx, None, ChangeStatus::Unmodified, &mut diagram_sockets) .await?; WsEvent::resource_refreshed(ctx, summary) .await? .publish_on_commit(ctx) .await?; } for dependency_prototype_id in prototype_to_trigger { Action::new(ctx, dependency_prototype_id, Some(component_id)).await?; } } else { // If status is not ok, set action state to failed Action::set_state(ctx, action_id, ActionState::Failed).await?; } } else { // If the maybe_resource is none, set action state to failed Action::set_state(ctx, action_id, ActionState::Failed).await?; } WsEvent::action_list_updated(ctx) .await? .publish_on_commit(ctx) .await?; ctx.write_audit_log( AuditLogKind::RunAction { prototype_id, action_kind: prototype.kind.into(), func_id: func.id, func_name: func.name.clone(), func_display_name: func.display_name, run_status: success, }, func.name, ) .await?; // Send the rebase request with the resource updated (if applicable) ctx.commit().await?; ctx.update_snapshot_to_visibility().await?; Ok(()) } #[instrument( name = "action_job.perform_graph_cleanups", level = "info", skip_all, fields() )] async fn perform_graph_cleanups( ctx: &DalContext, prototype_id: ActionPrototypeId, ) -> JobConsumerResult<()> { // if the action kind was a delete, let's see if any components are ready to be removed that // weren't already let prototype = ActionPrototype::get_by_id(ctx, prototype_id).await?; if ActionKind::Destroy == prototype.kind { // after we commit check for removable components if we just successfully deleted a // component let to_delete_components = Component::list_to_be_deleted(ctx).await?; let mut work_queue = VecDeque::from(to_delete_components); while let Some(component_to_delete) = work_queue.pop_front() { let component = Component::try_get_by_id(ctx, component_to_delete).await?; if let Some(component) = component { if component.allowed_to_be_removed(ctx).await? { Component::remove(ctx, component.id()).await?; WsEvent::component_deleted(ctx, component.id()) .await? .publish_on_commit(ctx) .await?; // refetch to see if new potential candidates can be removed work_queue.extend(Component::list_to_be_deleted(ctx).await?); } } } ctx.commit().await?; } Ok(()) } #[instrument( name = "action_job.process_failed_action", level = "info", skip_all, fields( si.action.id = ?action_id, ), )] async fn process_failed_action(ctx: &DalContext, action_id: ActionId) -> JobConsumerResult<()> { info!(%action_id, "processing action failed"); Action::set_state(ctx, action_id, ActionState::Failed).await?; FuncRunner::update_run_for_action_id(ctx, action_id, |func_run| { func_run.set_action_result_state(Some(ActionResultState::Failure)) }) .await?; ctx.commit().await?; Ok(()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server