Skip to main content
Glama
qualification.rs13.5 kB
use serde::{ Deserialize, Serialize, }; use si_data_pg::PgError; use si_frontend_types::ComponentQualificationStats; use si_id::AttributeValueId; use si_layer_cache::LayerDbError; use strum::{ AsRefStr, Display, EnumIter, EnumString, }; use telemetry::prelude::*; use thiserror::Error; use crate::{ AttributeValue, Component, ComponentError, ComponentId, DalContext, Prop, WsEventResult, attribute::value::AttributeValueError, component::qualification::QualificationEntry, func::FuncError, prop::PropError, validation::{ ValidationError, ValidationOutput, ValidationStatus, }, ws_event::{ WsEvent, WsPayload, }, }; #[derive(Deserialize, Serialize, Debug)] pub struct QualificationSummaryForComponent { pub component_id: ComponentId, pub component_name: String, pub total: u64, pub warned: u64, pub succeeded: u64, pub failed: u64, } #[derive(Deserialize, Serialize, Debug)] pub struct QualificationSummary { pub total: u64, pub succeeded: u64, pub warned: u64, pub failed: u64, pub components: Vec<QualificationSummaryForComponent>, } #[allow(clippy::large_enum_variant)] #[remain::sorted] #[derive(Error, Debug)] pub enum QualificationSummaryError { #[error("component error: {0}")] Component(#[from] Box<ComponentError>), #[error("pg error: {0}")] Pg(#[from] PgError), } impl From<ComponentError> for QualificationSummaryError { fn from(value: ComponentError) -> Self { Box::new(value).into() } } pub type QualificationSummaryResult<T> = Result<T, QualificationSummaryError>; impl QualificationSummary { #[instrument(level = "debug", skip_all)] pub async fn get_summary(ctx: &DalContext) -> QualificationSummaryResult<QualificationSummary> { let mut components_succeeded = 0; let mut components_warned = 0; let mut components_failed = 0; let mut total = 0; let component_ids = Component::list_ids(ctx).await?; let mut component_summaries = Vec::with_capacity(component_ids.len()); for component_id in component_ids { let stats = Self::individual_stats(ctx, component_id).await?; // Update counters for all components. if stats.failed > 0 { components_failed += 1; } else if stats.warned > 0 { components_warned += 1; } else { components_succeeded += 1; } total += stats.total; component_summaries.push(QualificationSummaryForComponent { component_id, component_name: Component::name_by_id(ctx, component_id).await?, total: stats.total, warned: stats.warned, succeeded: stats.succeeded, failed: stats.failed, }); } Ok(QualificationSummary { total, succeeded: components_succeeded, warned: components_warned, failed: components_failed, components: component_summaries, }) } #[instrument(level = "debug", skip_all)] pub async fn individual_stats( ctx: &DalContext, component_id: ComponentId, ) -> QualificationSummaryResult<ComponentQualificationStats> { let qualification_statuses = Component::list_qualification_statuses(ctx, component_id).await?; let total = qualification_statuses.len() as u64; let mut succeeded = 0; let mut warned = 0; let mut failed = 0; for status in qualification_statuses.iter().flatten() { match status { QualificationSubCheckStatus::Success => succeeded += 1, QualificationSubCheckStatus::Warning => warned += 1, QualificationSubCheckStatus::Failure => failed += 1, QualificationSubCheckStatus::Unknown => {} } } // FIXME(nick): delete this when we switch to the new UI. let running = total .checked_sub(warned) .and_then(|v| v.checked_sub(succeeded)) .and_then(|v| v.checked_sub(failed)) .unwrap_or(0); Ok(ComponentQualificationStats { total, warned, succeeded, failed, running, }) } } #[remain::sorted] #[derive(Error, Debug)] pub enum QualificationError { #[error("attribute value error: {0}")] AttributeValue(#[from] AttributeValueError), #[error("func error: {0}")] Func(#[from] FuncError), #[error("layer db error: {0}")] LayerDb(#[from] LayerDbError), #[error("no value returned in qualification function result")] NoValue, #[error("prop error: {0}")] Prop(#[from] PropError), #[error("error serializing/deserializing json: {0}")] SerdeJson(#[from] serde_json::Error), #[error("validation resolver error: {0}")] ValidationResolver(#[from] ValidationError), } #[derive(Deserialize, Serialize, Debug, Clone, Default, PartialEq, Eq)] pub struct QualificationResult { pub status: QualificationSubCheckStatus, pub title: Option<String>, pub link: Option<String>, pub sub_checks: Vec<QualificationSubCheck>, } /// A view on "OutputStream" from cyclone. #[derive(Deserialize, Serialize, Debug, Clone, Default, PartialEq, Eq)] pub struct QualificationOutputStreamView { pub stream: String, pub line: String, pub level: String, } #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct QualificationView { pub title: String, /// A collection of "OutputStream" views from cyclone. pub output: Vec<QualificationOutputStreamView>, pub description: Option<String>, pub link: Option<String>, pub result: Option<QualificationResult>, pub qualification_name: String, pub finalized: bool, } impl PartialOrd for QualificationView { fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.cmp(other)) } } impl Ord for QualificationView { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.title.cmp(&other.title) } } impl QualificationView { pub async fn new( ctx: &DalContext, attribute_value_id: AttributeValueId, ) -> Result<Option<Self>, QualificationError> { let maybe_qual_run = ctx .layer_db() .func_run() .get_last_qualification_for_attribute_value_id( ctx.events_tenancy().workspace_pk, attribute_value_id, ) .await?; match maybe_qual_run { Some(qual_run) => { let qualification_entry: QualificationEntry = match AttributeValue::view(ctx, attribute_value_id).await? { Some(value) => serde_json::from_value(value)?, None => return Ok(None), }; let sub_check = QualificationSubCheck { description: qualification_entry .message .unwrap_or_else(|| "no description provided".to_string()), status: qualification_entry .result .unwrap_or(QualificationSubCheckStatus::Unknown), }; let result = Some(QualificationResult { status: qualification_entry .result .unwrap_or(QualificationSubCheckStatus::Unknown), title: Some(qual_run.function_name().to_string()), link: qual_run.function_link().map(str::to_string), sub_checks: vec![sub_check], }); let (output, finalized) = match ctx .layer_db() .func_run_log() .get_for_func_run_id(qual_run.id()) .await? { Some(func_run_logs) => { let output = func_run_logs .logs() .iter() .map(|l| QualificationOutputStreamView { stream: l.stream.clone(), line: l.message.clone(), level: l.level.clone(), }) .collect(); let finalized = func_run_logs.is_finalized(); (output, finalized) } None => (Vec::new(), false), }; Ok(Some(QualificationView { title: qual_run .function_display_name() .map(str::to_string) .unwrap_or_else(|| qual_run.function_name().to_string()), description: qual_run.function_description().map(str::to_string), link: qual_run.function_link().map(str::to_string), output, finalized, result, qualification_name: qual_run.function_name().to_string(), })) } None => Ok(None), } } pub async fn new_for_validations( ctx: &DalContext, component_id: ComponentId, ) -> Result<Option<Self>, QualificationError> { let mut status = QualificationSubCheckStatus::Success; let mut fail_counter = 0; let mut has_active_validations = false; // Note(victor): If this is ever the bottleneck, we could pretty easily compute a // validations summary for a component and store it on the graph during the // compute_validations job. // Then we'd just load it here and convert to the view struct let component_validation_outputs = ValidationOutput::list_for_component(ctx, component_id).await?; let mut output = Vec::with_capacity(component_validation_outputs.len()); for (av_id, validation_output) in component_validation_outputs { // We have validations therefore, we need to show the validations in the Qualifications output has_active_validations = true; if validation_output.status != ValidationStatus::Success { // We need to filter out any false positive results for subscriptions that // have yet to propagate their value during DVU - this would be a misleading // result for a user let av = AttributeValue::get_by_id(ctx, av_id).await?; if AttributeValue::subscriptions(ctx, av_id).await?.is_some() && av.value(ctx).await?.is_none() { continue; } status = QualificationSubCheckStatus::Failure; fail_counter += 1; let prop_id = AttributeValue::prop_id(ctx, av_id).await?; let prop = Prop::get_by_id(ctx, prop_id).await?; output.push(QualificationOutputStreamView { stream: "stdout".to_owned(), level: "log".to_owned(), line: format!( "{}: {}", prop.name, validation_output .message .clone() .unwrap_or("message missing".to_string()) ), }); } } if !has_active_validations { return Ok(None); } let result = Some(QualificationResult { status, title: None, link: None, sub_checks: vec![QualificationSubCheck { description: format!("Component has {fail_counter} invalid value(s)."), status, }], }); Ok(Some(QualificationView { title: "Prop Validations".to_owned(), description: None, link: None, output, finalized: true, result, qualification_name: "validations".to_owned(), })) } } #[remain::sorted] #[derive( AsRefStr, Clone, Debug, Deserialize, Display, EnumIter, EnumString, Eq, PartialEq, Serialize, Copy, Default, )] #[serde(rename_all = "camelCase")] #[strum(serialize_all = "camelCase")] pub enum QualificationSubCheckStatus { Failure, Success, #[default] Unknown, Warning, } #[derive(Deserialize, Serialize, Debug, Clone, Default, PartialEq, Eq)] pub struct QualificationSubCheck { pub description: String, pub status: QualificationSubCheckStatus, } #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct QualificationCheckPayload { component_id: ComponentId, } impl WsEvent { pub async fn checked_qualifications( ctx: &DalContext, component_id: ComponentId, ) -> WsEventResult<Self> { WsEvent::new( ctx, WsPayload::CheckedQualifications(QualificationCheckPayload { component_id }), ) .await } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server