Skip to main content
Glama
validation.rs15.7 kB
use std::{ collections::VecDeque, sync::Arc, }; use itertools::join; use joi_validator::Validator; use serde::{ Deserialize, Serialize, }; use si_data_nats::NatsError; use si_data_pg::PgError; use si_events::{ FuncRunState, Timestamp, }; use si_layer_cache::LayerDbError; use telemetry::prelude::*; use thiserror::Error; use crate::{ AttributeValue, AttributeValueId, ChangeSetError, Component, ComponentError, ComponentId, DalContext, FuncError, Prop, TransactionsError, attribute::value::AttributeValueError, func::{ backend::validation::ValidationRunResult, runner::{ FuncRunner, FuncRunnerError, }, }, layer_db_types::{ ValidationContent, ValidationContentV1, }, prop::PropError, schema::variant::SchemaVariantError, workspace_snapshot::{ WorkspaceSnapshotError, content_address::{ ContentAddress, ContentAddressDiscriminants, }, edge_weight::{ EdgeWeight, EdgeWeightKind, EdgeWeightKindDiscriminants, }, node_weight::{ NodeWeight, NodeWeightError, }, }, }; #[allow(clippy::large_enum_variant)] #[remain::sorted] #[derive(Error, Debug)] pub enum ValidationError { #[error("attribute value error: {0}")] AttributeValue(#[from] Box<AttributeValueError>), #[error("change set error: {0}")] ChangeSet(#[from] ChangeSetError), #[error("component error: {0}")] Component(#[from] Box<ComponentError>), #[error("func error: {0}")] Func(#[from] Box<FuncError>), #[error("func run went away before a value could be sent down the channel")] FuncRunGone, #[error("func runner error: {0}")] FuncRunner(#[from] Box<FuncRunnerError>), #[error("invalid prop id")] InvalidPropId, #[error("layer db error: {0}")] LayerDb(#[from] LayerDbError), #[error("nats txn error: {0}")] Nats(#[from] NatsError), #[error("node weight error: {0}")] NodeWeight(#[from] NodeWeightError), #[error("no prop or no validation format on the prop for attribute value: {0}")] NoValidationFormatForAttributeValue(AttributeValueId), #[error("pg error: {0}")] Pg(#[from] PgError), #[error("prop error: {0}")] Prop(#[from] PropError), #[error("schema not found")] SchemaNotFound, #[error("schema variant error: {0}")] SchemaVariant(#[from] Box<SchemaVariantError>), #[error("schema variant not found")] SchemaVariantNotFound, #[error("error serializing/deserializing json: {0}")] SerdeJson(#[from] serde_json::Error), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("workspace snapshot error: {0}")] WorkspaceSnapshot(#[from] Box<WorkspaceSnapshotError>), } impl From<AttributeValueError> for ValidationError { fn from(e: AttributeValueError) -> Self { Box::new(e).into() } } impl From<ComponentError> for ValidationError { fn from(e: ComponentError) -> Self { Box::new(e).into() } } impl From<FuncError> for ValidationError { fn from(e: FuncError) -> Self { Box::new(e).into() } } impl From<FuncRunnerError> for ValidationError { fn from(e: FuncRunnerError) -> Self { Box::new(e).into() } } impl From<SchemaVariantError> for ValidationError { fn from(e: SchemaVariantError) -> Self { Box::new(e).into() } } impl From<WorkspaceSnapshotError> for ValidationError { fn from(e: WorkspaceSnapshotError) -> Self { Box::new(e).into() } } pub type ValidationResult<T> = Result<T, ValidationError>; pub use si_id::ValidationOutputId; // This type goes into the content store so cannot be re-ordered, only extended #[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)] pub enum ValidationStatus { Pending, Error, Failure, Success, } #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] pub struct ValidationOutput { pub status: ValidationStatus, pub message: Option<String>, } /// Stores the validation output for an [AttributeValue]. Should only exist if /// the av is for a prop and that prop has a validation format bound to it. /// /// Its only relation is established as follows: /// /// [AttributeValue] -- [EdgeWeightKind::ValidationOutput] --> [ValidationOutputNode] #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] pub struct ValidationOutputNode { pub id: ValidationOutputId, pub validation: ValidationOutput, } impl ValidationOutputNode { pub async fn upsert_or_wipe_for_attribute_value( ctx: &DalContext, attribute_value_id: AttributeValueId, maybe_validation_output: Option<ValidationOutput>, ) -> ValidationResult<Option<Self>> { // If no validation format, wipe any validation results if ValidationOutput::get_format_for_attribute_value_id(ctx, attribute_value_id) .await? .is_none() { Self::wipe_for_attribute_value_id(ctx, attribute_value_id).await?; return Ok(None); } // If no validation output, wipe any validation results let Some(validation) = maybe_validation_output else { Self::wipe_for_attribute_value_id(ctx, attribute_value_id).await?; return Ok(None); }; // Now we're sure we're creating something, compute content let timestamp = Timestamp::now(); let content = ValidationContentV1 { timestamp, status: validation.status, message: validation.message.clone(), }; let (hash, _) = ctx.layer_db().cas().write( Arc::new(ValidationContent::V1(content.clone()).into()), None, ctx.events_tenancy(), ctx.events_actor(), )?; let workspace_snapshot = ctx.workspace_snapshot()?; // If validation node exists, replace it, else create a new one. let id = if let Some(existing_node) = Self::find_for_attribute_value_id(ctx, attribute_value_id).await? { let id = existing_node.id; let node_weight = workspace_snapshot .get_node_weight(id) .await? .get_content_node_weight_of_kind(ContentAddressDiscriminants::ValidationOutput)?; let mut new_node_weight = node_weight.clone(); new_node_weight.new_content_hash(hash)?; workspace_snapshot .add_or_replace_node(NodeWeight::Content(new_node_weight)) .await?; id } else { let id = workspace_snapshot.generate_ulid().await?; let lineage_id = workspace_snapshot.generate_ulid().await?; let node_weight = NodeWeight::new_content(id, lineage_id, ContentAddress::ValidationOutput(hash)); workspace_snapshot.add_or_replace_node(node_weight).await?; workspace_snapshot .add_edge( attribute_value_id, EdgeWeight::new(EdgeWeightKind::ValidationOutput), id, ) .await?; id.into() }; Ok(Some(Self { id, validation })) } pub async fn find_for_attribute_value_id( ctx: &DalContext, attribute_value_id: AttributeValueId, ) -> ValidationResult<Option<ValidationOutputNode>> { let v = ctx .workspace_snapshot()? .outgoing_targets_for_edge_weight_kind( attribute_value_id, EdgeWeightKindDiscriminants::ValidationOutput, ) .await?; if let Some(validation_idx) = v.first() { let node_weight = ctx .workspace_snapshot()? .get_node_weight(*validation_idx) .await? .get_content_node_weight_of_kind(ContentAddressDiscriminants::ValidationOutput)?; let id = node_weight.id(); let ValidationContent::V1(ValidationContentV1 { status, message, .. }) = ctx .layer_db() .cas() .try_read_as(&node_weight.content_hash()) .await? .ok_or(WorkspaceSnapshotError::MissingContentFromStore(id))?; Ok(Some(Self { id: id.into(), validation: ValidationOutput { status, message }, })) } else { Ok(None) } } async fn wipe_for_attribute_value_id( ctx: &DalContext, attribute_value_id: AttributeValueId, ) -> ValidationResult<()> { let workspace_snapshot = ctx.workspace_snapshot()?; for validation_idx in workspace_snapshot .outgoing_targets_for_edge_weight_kind( attribute_value_id, EdgeWeightKindDiscriminants::ValidationOutput, ) .await? { let validation_id = workspace_snapshot .get_node_weight(validation_idx) .await? .id(); workspace_snapshot.remove_node_by_id(validation_id).await?; } Ok(()) } } impl ValidationOutput { pub async fn get_format_for_attribute_value_id( ctx: &DalContext, attribute_value_id: AttributeValueId, ) -> ValidationResult<Option<String>> { let validation_format = match AttributeValue::prop_id_opt(ctx, attribute_value_id).await? { Some(prop_id) => Prop::get_by_id(ctx, prop_id).await?.validation_format, None => None, }; Ok(validation_format) } /// If an attribute value is for a [Prop](Prop) that has a `validation_format`, run a validation /// for that format. pub(crate) async fn compute_for_attribute_value( ctx: &DalContext, attribute_value_id: AttributeValueId, parent_span: Span, ) -> ValidationResult<Option<ValidationOutput>> { // Check if this attribute value has subscriptions to properties that exist but don't have values yet // If so, skip validation until the subscribed values are populated // If any subscription resolves to a valid property but that property has no value, skip validation if let Some(subscriptions) = AttributeValue::subscriptions(ctx, attribute_value_id).await? { for subscription in subscriptions { if let Some(resolved_av_id) = subscription.resolve(ctx).await? { // Subscription is valid (path exists), check if the resolved AV has a value let resolved_value = AttributeValue::get_by_id(ctx, resolved_av_id) .await? .value(ctx) .await?; if resolved_value.is_none() { // Subscription exists but hasn't resolved to a value yet - don't validate return Ok(None); } } } } let value = AttributeValue::get_by_id(ctx, attribute_value_id) .await? .value_or_default(ctx) .await?; match Self::get_format_for_attribute_value_id(ctx, attribute_value_id).await? { None => Ok(None), Some(validation_format) => Ok(Some( // If we can't deserialize the validation format, run remotely match serde_json::from_str(&validation_format) { Ok(validator) => run_locally(validator, value, parent_span), Err(serde_error) => { run_remotely( ctx, attribute_value_id, value, validation_format, Some(serde_error), parent_span, ) .await? } }, )), } } pub async fn list_for_component( ctx: &DalContext, component_id: ComponentId, ) -> ValidationResult<Vec<(AttributeValueId, ValidationOutput)>> { let component = Component::get_by_id(ctx, component_id).await?; let domain_av = component.domain_prop_attribute_value(ctx).await?; let mut outputs = vec![]; let mut queue = VecDeque::from(vec![domain_av]); while let Some(attribute_value_id) = queue.pop_front() { let maybe_validation_output = ValidationOutputNode::find_for_attribute_value_id(ctx, attribute_value_id) .await? .map(|node| node.validation); let children_av_ids = AttributeValue::get_child_av_ids_in_order(ctx, attribute_value_id).await?; queue.extend(children_av_ids); if let Some(validation_output) = maybe_validation_output { outputs.push((attribute_value_id, validation_output)); } } Ok(outputs) } } #[instrument( name = "validation.run_locally", level = "info", parent = parent_span, skip_all, fields(si.validation.rules = %validator.rule_names().join(", ")) )] fn run_locally( validator: Validator, value: Option<serde_json::Value>, parent_span: Span, ) -> ValidationOutput { // We treat value as undefined if it's null let value = match value { Some(serde_json::Value::Null) => None, value => value, }; match validator.validate(&value).error { None => ValidationOutput { status: ValidationStatus::Success, message: None, }, Some(error) => ValidationOutput { status: ValidationStatus::Failure, message: Some(join(error.details.iter().map(|e| &e.message), "\n")), }, } } #[instrument( name = "validation.run_remotely", level = "info", parent = parent_span, skip_all, fields(si.validation.because = because.map(|e| e.to_string())) )] async fn run_remotely( ctx: &DalContext, attribute_value_id: AttributeValueId, value: Option<serde_json::Value>, validation_format: String, because: Option<serde_json::Error>, parent_span: Span, ) -> ValidationResult<ValidationOutput> { let result_channel = FuncRunner::run_validation_format(ctx, attribute_value_id, value, validation_format) .await?; let func_result_value = match result_channel .await .map_err(|_| ValidationError::FuncRunGone)? { Ok(func_run_result) => func_run_result, Err(FuncRunnerError::ResultFailure { kind, message, .. }) => { return Ok(ValidationOutput { status: ValidationStatus::Error, message: Some(format!("{kind}: {message}")), }); } Err(e) => return Err(e.into()), }; let message = match func_result_value.value() { Some(raw_value) => serde_json::from_value::<ValidationRunResult>(raw_value.clone())?.error, None => None, }; FuncRunner::update_run(ctx, func_result_value.func_run_id(), |func_run| { func_run.set_state(FuncRunState::Success) }) .await?; Ok(ValidationOutput { status: match message { Some(_) => ValidationStatus::Failure, None => ValidationStatus::Success, }, message, }) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server