Skip to main content
Glama
new.rs14 kB
use std::{ collections::{ BTreeMap, VecDeque, }, sync::Arc, }; use si_events::{ ContentHash, Timestamp, }; use si_id::{ AttributeValueId, SchemaVariantId, ViewId, }; use telemetry::prelude::*; use super::{ Component, ComponentResult, }; use crate::{ AttributeValue, ComponentError, DalContext, EdgeWeightKind, InputSocket, OutputSocket, Prop, PropKind, SchemaVariant, action::{ Action, prototype::{ ActionKind, ActionPrototype, }, }, attribute::value::default_subscription::{ DefaultSubscription, DefaultSubscriptionSource, detect_possible_default_subscription_for_prop, }, dependency_graph::DependencyGraph, diagram::geometry::Geometry, layer_db_types::{ ComponentContent, ComponentContentV2, }, validation::ValidationOutput, workspace_snapshot::{ DependentValueRoot, node_weight::{ NodeWeight, category_node_weight::CategoryNodeKind, }, }, }; #[derive(Debug, Clone)] enum PotentialSubscription { Existing { source_av_id: AttributeValueId, dest_av_id: AttributeValueId, }, ArrayElem { source_av_id: AttributeValueId, parent_av_id: AttributeValueId, }, } impl PotentialSubscription { pub async fn subscribe(&self, ctx: &DalContext) -> ComponentResult<()> { let (source_av_id, dest_av_id) = match self { PotentialSubscription::Existing { source_av_id, dest_av_id, } => (*source_av_id, *dest_av_id), PotentialSubscription::ArrayElem { source_av_id, parent_av_id, } => { let dest_av_id = AttributeValue::insert(ctx, *parent_av_id, None, None).await?; (*source_av_id, dest_av_id) } }; let default_sub = DefaultSubscription { source_av_id, dest_av_id, }; default_sub.subscribe(ctx).await?; Ok(()) } } impl Component { pub async fn new_component_inner( ctx: &DalContext, name: impl Into<String>, schema_variant_id: SchemaVariantId, content_address: ContentHash, default_sources: Option<&[DefaultSubscriptionSource]>, ) -> ComponentResult<Self> { let name: String = name.into(); let component_schema_name = SchemaVariant::schema_for_schema_variant_id(ctx, schema_variant_id) .await? .name() .to_owned(); let mut component_av_graph = DependencyGraph::new(); let workspace_snapshot = ctx.workspace_snapshot()?; let id = workspace_snapshot.generate_ulid().await?; let lineage_id = workspace_snapshot.generate_ulid().await?; let node_weight = NodeWeight::new_component(id, lineage_id, content_address); // Attach component to category and add use edge to schema variant workspace_snapshot.add_or_replace_node(node_weight).await?; // Root --> Component Category --> Component (this) let component_category_id = workspace_snapshot .get_category_node_or_err(CategoryNodeKind::Component) .await?; Self::add_category_edge( ctx, component_category_id, id.into(), EdgeWeightKind::new_use(), ) .await?; // Create attribute values for all socket corresponding to input and output sockets. for input_socket_id in InputSocket::list_ids_for_schema_variant(ctx, schema_variant_id).await? { let attribute_value = AttributeValue::new(ctx, input_socket_id, Some(id.into()), None, None).await?; DependentValueRoot::add_dependent_value_root( ctx, DependentValueRoot::Unfinished(attribute_value.id().into()), ) .await?; } for output_socket_id in OutputSocket::list_ids_for_schema_variant(ctx, schema_variant_id).await? { let attribute_value = AttributeValue::new(ctx, output_socket_id, Some(id.into()), None, None).await?; DependentValueRoot::add_dependent_value_root( ctx, DependentValueRoot::Unfinished(attribute_value.id().into()), ) .await?; } // Walk all the props for the schema variant and create attribute values for all of them let root_prop_id = SchemaVariant::get_root_prop_id(ctx, schema_variant_id).await?; let mut work_queue = VecDeque::from([(root_prop_id, None::<AttributeValueId>, None)]); let mut default_subscriptions = BTreeMap::new(); while let Some((prop_id, maybe_parent_attribute_value_id, key)) = work_queue.pop_front() { // If we came in with a key, we're the child of a map. We should not descend deeper // into it because the value should be governed by its prototype function and will // create child values when that function is executed let should_descend = key.is_none(); let prop_weight = workspace_snapshot .get_node_weight(prop_id) .await? .get_prop_node_weight()?; let prop_kind = prop_weight.kind(); // Create an attribute value for the prop. let attribute_value = AttributeValue::new( ctx, prop_id, Some(id.into()), maybe_parent_attribute_value_id, key.clone(), ) .await?; component_av_graph.add_id(attribute_value.id()); if let Some(parent_av_id) = maybe_parent_attribute_value_id { component_av_graph.id_depends_on(parent_av_id, attribute_value.id()); } if let Some(source_av_id) = detect_possible_default_subscription_for_prop( ctx, component_schema_name.clone(), prop_id, default_sources, ) .await? { let dest_av_id = attribute_value.id(); default_subscriptions.insert( dest_av_id, PotentialSubscription::Existing { source_av_id, dest_av_id, }, ); } if ValidationOutput::get_format_for_attribute_value_id(ctx, attribute_value.id()) .await? .is_some() { ctx.enqueue_compute_validations(attribute_value.id()) .await?; } match prop_kind { PropKind::Object => { if should_descend { let ordered_children = workspace_snapshot .ordered_children_for_node(prop_id) .await? .ok_or(ComponentError::ObjectPropHasNoOrderingNode(prop_id))?; for child_prop_id in ordered_children { work_queue.push_back(( child_prop_id.into(), Some(attribute_value.id()), None, )); } } } PropKind::Map => { let element_prop_id = Prop::direct_single_child_prop_id(ctx, prop_id).await?; if should_descend { for (key, _) in Prop::prototypes_by_key(ctx, element_prop_id).await? { if key.is_some() { work_queue.push_back(( element_prop_id, Some(attribute_value.id()), key, )) } } } } PropKind::Array => { let element_prop_id = Prop::direct_single_child_prop_id(ctx, prop_id).await?; if let Some(source_av_id) = detect_possible_default_subscription_for_prop( ctx, component_schema_name.clone(), element_prop_id, default_sources, ) .await? { let parent_av_id = attribute_value.id(); default_subscriptions.insert( parent_av_id, PotentialSubscription::ArrayElem { source_av_id, parent_av_id, }, ); } } // We want to only add leaves to the DVU roots _ => { DependentValueRoot::add_dependent_value_root( ctx, DependentValueRoot::Unfinished(attribute_value.id().into()), ) .await?; } } } let (node_weight, content) = Self::get_node_weight_and_content(ctx, id.into()).await?; let component = Self::assemble(&node_weight, content); // Component (this) --> Schema Variant Component::add_edge_to_schema_variant( ctx, component.id, schema_variant_id, EdgeWeightKind::new_use(), ) .await?; component.set_name(ctx, &name).await?; //set a component specific prototype for the root/si/type as we don't want it to ever change other than manually let sv_type = SchemaVariant::get_by_id(ctx, schema_variant_id) .await? .get_type(ctx) .await?; if let Some(sv_type) = sv_type { component.set_type(ctx, sv_type).await?; } // Create all the default subscriptions discovered while creating the // component attribute tree for (dest_av_id, potential_sub) in &default_subscriptions { // Use the dep graph to determine if a parent value of the // destination attribute has a subscription or is set by a dynamic // function let all_parents_of = component_av_graph.all_parents_of(*dest_av_id); let mut has_parent_with_sub_or_dynamic_func = false; for parent_av_id in all_parents_of { if default_subscriptions.contains_key(&parent_av_id) { has_parent_with_sub_or_dynamic_func = true; break; } if AttributeValue::is_set_by_dependent_function(ctx, parent_av_id).await? { has_parent_with_sub_or_dynamic_func = true; break; } } if has_parent_with_sub_or_dynamic_func { continue; } potential_sub.subscribe(ctx).await?; } // Find the create action prototype for the schema or variant and enqueue the create action for prototype in ActionPrototype::find_by_kind_for_schema_or_variant( ctx, ActionKind::Create, schema_variant_id, ) .await? { Action::new(ctx, prototype.id(), Some(component.id)).await?; } // Clear the prop suggestion cache's mapping // TODO: we should just *add* the new component to the mapping and then // determine if we need to process the schema variant (if it's a new // one) ctx.workspace_snapshot()? .clear_prop_suggestions_cache() .await; Ok(component) } /// Create new component node but retain existing content address /// This is used to create the replacement nodes on upgrade, so geometries for it need /// to be created by hand. Anywhere else you want to use [Self::new](Self::new) pub async fn new_with_content_address_and_no_geometry_no_default_subscriptions( ctx: &DalContext, name: impl Into<String>, schema_variant_id: SchemaVariantId, content_address: ContentHash, ) -> ComponentResult<Self> { Self::new_component_inner(ctx, name, schema_variant_id, content_address, None).await } #[instrument( name = "component.new", level = "info", skip_all, fields( schema_variant.id = ?schema_variant_id ))] pub async fn new( ctx: &DalContext, name: impl Into<String>, schema_variant_id: SchemaVariantId, view_id: ViewId, ) -> ComponentResult<Self> { let content = ComponentContentV2 { timestamp: Timestamp::now(), }; let (hash, _) = ctx.layer_db().cas().write( Arc::new(ComponentContent::V2(content.clone()).into()), None, ctx.events_tenancy(), ctx.events_actor(), )?; let sources = AttributeValue::get_default_subscription_sources(ctx).await?; let component = Self::new_component_inner( ctx, name, schema_variant_id, hash, if sources.is_empty() { None } else { Some(sources.as_slice()) }, ) .await?; // Create geometry node Geometry::new_for_component(ctx, component.id, view_id).await?; Ok(component) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server