Skip to main content
Glama
management_func.rs17.6 kB
use async_trait::async_trait; use itertools::Itertools; use pinga_core::api_types::job_execution_request::JobArgsVCurrent; use si_db::{ ManagementFuncExecutionError, ManagementFuncJobState, ManagementState, }; use si_events::audit_log::AuditLogKind; use si_id::{ ChangeSetId, ComponentId, FuncRunId, ManagementFuncJobStateId, ManagementPrototypeId, ViewId, WorkspacePk, }; use telemetry::prelude::*; use thiserror::Error; use tokio::{ task::JoinError, time::Duration, }; use veritech_client::ManagementFuncStatus; use crate::{ Component, ComponentError, DalContext, Func, FuncError, TransactionsError, WorkspaceSnapshotError, WsEvent, WsEventError, action::{ Action, ActionError, }, job::consumer::{ DalJob, JobCompletionState, JobConsumer, JobConsumerResult, }, management::{ ManagementError, ManagementFuncReturn, ManagementOperator, prototype::{ ManagementFuncKind, ManagementPrototype, ManagementPrototypeError, ManagementPrototypeExecution, }, }, workspace_snapshot::{ DependentValueRoot, dependent_value_root::DependentValueRootError, }, }; #[remain::sorted] #[derive(Debug, Error)] pub enum ManagementFuncJobError { #[error("action error: {0}")] Action(#[from] ActionError), #[error("component error: {0}")] Component(#[from] Box<ComponentError>), #[error("dependent value root error: {0}")] DependentValueRoot(#[from] Box<DependentValueRootError>), #[error("func error: {0}")] Func(#[from] Box<FuncError>), #[error("management prototype {0} is not valid for component {1}")] InvalidPrototypeForComponent(ManagementPrototypeId, ComponentId), #[error("management error: {0}")] Management(#[from] Box<ManagementError>), #[error("management execution state error: {0}")] ManagementFuncExecutionState(#[from] ManagementFuncExecutionError), #[error("management func js execution failed")] ManagementFuncJsExecutionFailed, #[error("management prototype error: {0}")] ManagementPrototype(#[from] Box<ManagementPrototypeError>), #[error( "no pending execution for component {0} and management prototype {1} in change set {2}" )] NoPendingExecution(ComponentId, ManagementPrototypeId, ChangeSetId), #[error("serde json error: {0}")] Serde(#[from] serde_json::Error), #[error("tokio task error: {0}")] TokioTask(#[from] JoinError), #[error("transactions error: {0}")] Transactions(#[from] Box<TransactionsError>), #[error( "management func {0} for component {1} waited too long for dependent values to be calculated" )] WaitedTooLongForDependentValueRoots(ManagementPrototypeId, ComponentId), #[error("workspace snapshot error: {0}")] WorkspaceSnapshot(#[from] Box<WorkspaceSnapshotError>), #[error("ws event error: {0}")] WsEvent(#[from] Box<WsEventError>), } impl From<ComponentError> for ManagementFuncJobError { fn from(value: ComponentError) -> Self { Box::new(value).into() } } impl From<DependentValueRootError> for ManagementFuncJobError { fn from(value: DependentValueRootError) -> Self { Box::new(value).into() } } impl From<FuncError> for ManagementFuncJobError { fn from(value: FuncError) -> Self { Box::new(value).into() } } impl From<ManagementError> for ManagementFuncJobError { fn from(value: ManagementError) -> Self { Box::new(value).into() } } impl From<ManagementPrototypeError> for ManagementFuncJobError { fn from(value: ManagementPrototypeError) -> Self { Box::new(value).into() } } impl From<TransactionsError> for ManagementFuncJobError { fn from(value: TransactionsError) -> Self { Box::new(value).into() } } impl From<WorkspaceSnapshotError> for ManagementFuncJobError { fn from(value: WorkspaceSnapshotError) -> Self { Box::new(value).into() } } impl From<WsEventError> for ManagementFuncJobError { fn from(value: WsEventError) -> Self { Box::new(value).into() } } pub type ManagementFuncJobResult<T> = Result<T, ManagementFuncJobError>; const WAIT_MS: u64 = 50; const MAX_WAITS: u64 = 5_000; #[derive(Clone, Debug)] pub struct ManagementFuncJob { workspace_id: WorkspacePk, change_set_id: ChangeSetId, component_id: ComponentId, prototype_id: ManagementPrototypeId, view_id: ViewId, request_ulid: ulid::Ulid, } impl ManagementFuncJob { pub fn new( workspace_id: WorkspacePk, change_set_id: ChangeSetId, prototype_id: ManagementPrototypeId, component_id: ComponentId, view_id: ViewId, request_ulid: Option<ulid::Ulid>, ) -> Box<Self> { // NOTE(nick): this may seem hacky, but it is acceptable to create this later down the // chain. That's because this ID is threaded through all management operations and its // lifecycle. None of that tracking requires you create the ID at the outermost point. You // just may miss early event(s). let request_ulid = request_ulid.unwrap_or_default(); Self { workspace_id, change_set_id, component_id, prototype_id, view_id, request_ulid, } .into() } async fn spin_until_ready( &self, ctx: &mut DalContext, max: u64, ) -> Result<(), ManagementFuncJobError> { let mut count = 0; loop { if count >= max { return Err(ManagementFuncJobError::WaitedTooLongForDependentValueRoots( self.prototype_id, self.component_id, )); } ctx.update_snapshot_to_visibility().await?; if !DependentValueRoot::roots_exist(ctx).await? { break; } tokio::time::sleep(Duration::from_millis(WAIT_MS)).await; count += 1; // Signal progress when spinning until ready. WsEvent::management_operations_in_progress(ctx, self.request_ulid) .await? .publish_immediately(ctx) .await?; } Ok(()) } async fn run_inner( &self, ctx: &DalContext, execution_state_id: ManagementFuncJobStateId, ) -> ManagementFuncJobResult<JobCompletionState> { let mut ctx_clone = ctx.clone(); // Loop for 5_000 * 50 ms (= 250 seconds max) and then mark job as failed if not ready self.spin_until_ready(&mut ctx_clone, MAX_WAITS).await?; // We want to operate on the snapshot that is "ready" let ctx = &ctx_clone; let (geometries, placeholders, run_channel, func_run_id) = ManagementPrototype::start_execution( ctx, self.prototype_id, self.component_id, self.view_id.into(), ) .await?; // Signal progress after execution is started. WsEvent::management_operations_in_progress(ctx, self.request_ulid) .await? .publish_immediately(ctx) .await?; Self::executing_state(ctx, execution_state_id, func_run_id).await?; let execution_result = ManagementPrototype::finalize_execution( ctx, self.component_id, self.prototype_id, geometries, placeholders, run_channel, func_run_id, ) .await?; // Signal progress after execution is finalized, but before operation. WsEvent::management_operations_in_progress(ctx, self.request_ulid) .await? .publish_immediately(ctx) .await?; let maybe_created_ids = self .operate(ctx, execution_state_id, execution_result) .await?; Self::success_state(ctx, execution_state_id).await?; let kind = ManagementPrototype::kind_by_id(ctx, self.prototype_id).await?; if kind == ManagementFuncKind::Import { // if this is an import, let's auto subscribe! // todo: write this (and other operations errors) down somewhere Component::autosubscribe(ctx, self.component_id).await?; } else if kind == ManagementFuncKind::Discover { // if we discovered, let's auto subscribe what was just discovered if let Some(created_component_ids) = maybe_created_ids { for component_id in created_component_ids { // todo: write this (and other operations errors) down somewhere Component::autosubscribe(ctx, component_id).await?; } } } // if the management prototype is for an Import func and we're not on head, and the component doesn't already have a resource, let's dispatch it! // note: this is to future proof for when our import functions set the resource directly // as if the resource has already been set, we don't need to run the refresh func in this change set if kind == ManagementFuncKind::Import && ctx.get_workspace_default_change_set_id().await? != ctx.change_set_id() && Component::exists_on_head_by_ids(ctx, &[self.component_id]) .await? .is_empty() && Component::resource_by_id(ctx, self.component_id) .await? .is_none() { let actions = Action::find_for_kind_and_component_id( ctx, self.component_id, crate::action::prototype::ActionKind::Refresh, ) .await?; if let Ok(action_id) = actions.iter().exactly_one() { // Note: this is intentionally not waiting for DVU // or worrying about the state of other actions and // whether this action is dependent on something else // we don't care, because the user just ran import, // and post-import, the component already has everything it // needs to successfully run the refresh function Action::dispatch_action(ctx, *action_id).await?; } } ctx.commit().await?; Ok(JobCompletionState::Done) } async fn operate( &self, ctx: &DalContext, execution_state_id: ManagementFuncJobStateId, mut execution_result: ManagementPrototypeExecution, ) -> ManagementFuncJobResult<Option<Vec<ComponentId>>> { let result = execution_result .result .take() .ok_or(ManagementFuncJobError::ManagementFuncJsExecutionFailed)?; Self::operating_state(ctx, execution_state_id).await?; let result: ManagementFuncReturn = result.try_into()?; let mut created_component_ids = None; if result.status == ManagementFuncStatus::Ok { if let Some(operations) = result.operations { // Signal progress before performing operations. WsEvent::management_operations_in_progress(ctx, self.request_ulid) .await? .publish_immediately(ctx) .await?; created_component_ids = ManagementOperator::new( ctx, self.component_id, operations, execution_result, self.view_id.into(), self.request_ulid, ) .await? .operate() .await?; } } let func_id = ManagementPrototype::func_id(ctx, self.prototype_id).await?; let func = Func::get_by_id(ctx, func_id).await?; WsEvent::management_operations_complete( ctx, // TODO(nick): make this required. Some(self.request_ulid), func.name.clone(), result.message.clone(), result.status, created_component_ids.clone(), ) .await? .publish_on_commit(ctx) .await?; ctx.write_audit_log( AuditLogKind::ManagementOperationsComplete { component_id: self.component_id, prototype_id: self.prototype_id, func_id, func_name: func.name.clone(), status: match result.status { ManagementFuncStatus::Ok => "ok", ManagementFuncStatus::Error => "error", } .to_string(), message: result.message.clone(), }, func.name, ) .await?; Ok(created_component_ids) } async fn run_prototype( &self, ctx: &mut DalContext, ) -> ManagementFuncJobResult<JobCompletionState> { // Signal progress when running the prototype. WsEvent::management_operations_in_progress(ctx, self.request_ulid) .await? .publish_immediately(ctx) .await?; let pending_execution = ManagementFuncJobState::get_pending(ctx, self.component_id, self.prototype_id) .await? .ok_or(ManagementFuncJobError::NoPendingExecution( self.component_id, self.prototype_id, self.change_set_id, ))?; let execution_state_id = pending_execution.id(); match self.run_inner(ctx, execution_state_id).await { Ok(completion_state) => Ok(completion_state), Err(err) => { let failure_message = err.to_string(); Self::fail_state(ctx, execution_state_id, failure_message).await?; // Signal failure because we never reach a "complete" state. WsEvent::management_operations_failed(ctx, self.request_ulid) .await? .publish_immediately(ctx) .await?; Err(err) } } } async fn transition_state( ctx: &DalContext, execution_state_id: ManagementFuncJobStateId, new_state: si_db::ManagementState, func_run_id: Option<FuncRunId>, failure_message: Option<String>, ) -> ManagementFuncJobResult<()> { let mut ctx_clone = ctx.clone(); // Restart connections to ensure we don't flush jobs, and because we // want to commit the changes to the database independently of any other // queries made in the "parent" contexts. ctx_clone.restart_connections().await?; if new_state == ManagementState::Failure { if let Ok(snap) = ctx_clone.workspace_snapshot() { snap.revert().await; } } ManagementFuncJobState::transition_state( &ctx_clone, execution_state_id, new_state, func_run_id, failure_message, ) .await?; ctx_clone.commit_no_rebase().await?; Ok(()) } async fn executing_state( ctx: &DalContext, execution_state_id: ManagementFuncJobStateId, func_run_id: FuncRunId, ) -> ManagementFuncJobResult<()> { Self::transition_state( ctx, execution_state_id, ManagementState::Executing, Some(func_run_id), None, ) .await } async fn operating_state( ctx: &DalContext, execution_state_id: ManagementFuncJobStateId, ) -> ManagementFuncJobResult<()> { Self::transition_state( ctx, execution_state_id, ManagementState::Operating, None, None, ) .await } async fn success_state( ctx: &DalContext, execution_state_id: ManagementFuncJobStateId, ) -> ManagementFuncJobResult<()> { Self::transition_state( ctx, execution_state_id, ManagementState::Success, None, None, ) .await } async fn fail_state( ctx: &DalContext, execution_state_id: ManagementFuncJobStateId, message: String, ) -> ManagementFuncJobResult<()> { Self::transition_state( ctx, execution_state_id, ManagementState::Failure, None, Some(message), ) .await } } impl DalJob for ManagementFuncJob { fn args(&self) -> JobArgsVCurrent { JobArgsVCurrent::ManagementFunc { component_id: self.component_id, prototype_id: self.prototype_id, view_id: self.view_id, // TODO(nick): make this required. request_ulid: Some(self.request_ulid), } } fn workspace_id(&self) -> WorkspacePk { self.workspace_id } fn change_set_id(&self) -> ChangeSetId { self.change_set_id } } #[async_trait] impl JobConsumer for ManagementFuncJob { #[instrument(name = "management_func.run", level = "info", skip_all)] async fn run(&self, ctx: &mut DalContext) -> JobConsumerResult<JobCompletionState> { Ok(self.run_prototype(ctx).await?) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server