Skip to main content
Glama
dependent_value_graph.rs33.8 kB
use std::{ collections::{ BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, btree_map, hash_map::Entry, }, fs::File, io::Write as _, sync::Arc, }; use petgraph::prelude::*; use si_events::ulid::Ulid; use si_id::{ LeafPrototypeId, SchemaId, }; use telemetry::prelude::*; use super::{ AttributeValue, AttributeValueId, AttributeValueResult, subscription::ValueSubscription, }; use crate::{ Component, ComponentId, DalContext, EdgeWeightKind, Func, Prop, PropKind, Secret, attribute::{ prototype::{ AttributePrototype, argument::AttributePrototypeArgument, }, value::ValueIsFor, }, component::ControllingFuncData, dependency_graph::DependencyGraph, schema::leaf::LeafPrototype, workspace_snapshot::{ DependentValueRoot, WorkspaceSnapshotSelector, edge_weight::EdgeWeightKindDiscriminants, node_weight::NodeWeightDiscriminants, }, }; #[derive(Copy, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum DependentValue { AttributeValue(AttributeValueId), OverlayDestination { leaf_prototype_id: LeafPrototypeId, destination_map_id: AttributeValueId, destination_element_id: Option<AttributeValueId>, root_attribute_value_id: AttributeValueId, }, } impl DependentValue { pub fn attribute_value_id(&self) -> AttributeValueId { match self { DependentValue::AttributeValue(attribute_value_id) => *attribute_value_id, DependentValue::OverlayDestination { destination_map_id, destination_element_id, .. } => (*destination_element_id).unwrap_or(*destination_map_id), } } } impl From<DependentValue> for Ulid { fn from(value: DependentValue) -> Self { value.attribute_value_id().into() } } impl From<DependentValue> for AttributeValueId { fn from(value: DependentValue) -> Self { value.attribute_value_id() } } #[derive(Debug, Clone, Default)] pub struct DependentValueGraph { inner: DependencyGraph<DependentValue>, values_that_need_to_execute_from_prototype_function: BTreeSet<DependentValue>, component_id_for_av: BTreeMap<AttributeValueId, ComponentId>, root_attribute_values_for_component: BTreeMap<ComponentId, AttributeValueId>, schema_id_for_component: BTreeMap<ComponentId, SchemaId>, leaf_prototypes_for_schema: BTreeMap<SchemaId, Arc<Vec<(LeafPrototype, String)>>>, leaf_overlays_for_component: BTreeMap<ComponentId, Arc<Vec<ComponentLeaf>>>, leaf_element_ids: BTreeMap<AttributeValueId, ComponentLeaf>, } // We specifically need to track if the value is one of the child values we // added to the graph in order to discover if a dynamically set object's // children are inputs to a function. The other two parts of this enum are not // used now, but may be useful information when debugging enum WorkQueueValue { Initial(AttributeValueId), ObjectChild(AttributeValueId), Discovered(AttributeValueId), } #[derive(Debug, Clone)] struct ComponentLeaf { destination_map_id: AttributeValueId, destination_element_id: Option<AttributeValueId>, inputs: Vec<AttributeValueId>, prototype_id: LeafPrototypeId, } impl WorkQueueValue { #[inline(always)] fn id(&self) -> AttributeValueId { match self { WorkQueueValue::Initial(id) | WorkQueueValue::ObjectChild(id) | WorkQueueValue::Discovered(id) => *id, } } } impl DependentValueGraph { /// Construct a [`DependentValueGraph`] of all the [`AttributeValueIds`](AttributeValue) who are /// dependent on the initial ids provided as well as all descending dependencies. pub async fn new( ctx: &DalContext, roots: Vec<DependentValueRoot>, ) -> AttributeValueResult<Self> { let mut dependent_value_graph = Self::default(); let values = dependent_value_graph.parse_initial_ids(ctx, roots).await?; dependent_value_graph .populate_for_values(ctx, values) .await?; Ok(dependent_value_graph) } /// Parse the set of initial ids in order to construct the list of [`values`](WorkQueueValue). async fn parse_initial_ids( &mut self, ctx: &DalContext, roots: Vec<DependentValueRoot>, ) -> AttributeValueResult<Vec<WorkQueueValue>> { let workspace_snapshot = ctx.workspace_snapshot()?; let mut values = Vec::with_capacity(roots.len()); for root in roots { let root_ulid: Ulid = root.into(); // It's possible that one or more of the initial ids provided by the enqueued // DependentValuesUpdate job may have been removed from the snapshot between when the // DVU job was created and when we're processing things now. This could happen if there // are other modifications to the snapshot before the DVU job starts executing, as the // job always operates on the current state of the change set's snapshot, not the state // at the time the job was created. if !workspace_snapshot.node_exists(root_ulid).await { debug!(%root_ulid, "missing node, skipping it in DependentValueGraph"); continue; } let node_weight = workspace_snapshot.get_node_weight(root_ulid).await?; match node_weight.into() { NodeWeightDiscriminants::AttributeValue => { let initial_attribute_value_id: AttributeValueId = root_ulid.into(); let component_id = self .component_id_for_av(ctx, initial_attribute_value_id) .await?; let root_attribute_value_id = self .root_attribute_value_for_component(ctx, component_id) .await?; // We don't need this result here, we just need to calculate the // data to setup the cached maps self.leaf_overlays_for_component(ctx, component_id).await?; // If this is a leaf overlay destination element (the k/v // pair for the result of a leaf) then we want to be sure we // re-execute it if let Some(leaf) = self.leaf_element_ids.get(&initial_attribute_value_id) { let overlay_destination = DependentValue::OverlayDestination { leaf_prototype_id: leaf.prototype_id, destination_map_id: leaf.destination_map_id, destination_element_id: leaf.destination_element_id, root_attribute_value_id, }; self.values_needs_to_execute_from_prototype_function(overlay_destination); self.inner.add_id(overlay_destination); } else { if AttributeValue::is_set_by_dependent_function( ctx, initial_attribute_value_id, ) .await? { self.values_that_need_to_execute_from_prototype_function .insert(DependentValue::AttributeValue(initial_attribute_value_id)); } self.inner .add_id(DependentValue::AttributeValue(initial_attribute_value_id)); } values.push(WorkQueueValue::Initial(initial_attribute_value_id)); } NodeWeightDiscriminants::Secret => { // If we are processing a secret, we don't want to add the secret itself. We // only want to add the direct dependent attribute values. We also need to mark // these values as needing to be processed because DVU will skip processing the // first set of independent values. In most cases, we want that to happen. // However, since the first set of independent values all have a secret as their // parent and not an attribute value, they need to be processed in DVU. let direct_dependents = Secret::direct_dependent_attribute_values(ctx, root_ulid.into()) .await .map_err(Box::new)?; self.values_that_need_to_execute_from_prototype_function .extend( direct_dependents .iter() .map(|id| DependentValue::AttributeValue(*id)), ); values.extend( direct_dependents .iter() .map(|d| WorkQueueValue::Initial(*d)), ); } discrim => { warn!(%discrim, %root_ulid, "skipping dependent value graph generation for unsupported node weight"); } }; } Ok(values) } async fn process_subscription_dependencies( &mut self, ctx: &DalContext, component_root_attribute_value_id: AttributeValueId, workspace_snapshot: &WorkspaceSnapshotSelector, controlling_funcs_for_component: &mut HashMap< ComponentId, HashMap<AttributeValueId, ControllingFuncData>, >, ) -> AttributeValueResult<Vec<WorkQueueValue>> { let current_component_id = self .component_id_for_av(ctx, component_root_attribute_value_id) .await?; let mut new_work_queue_values = vec![]; for (edge, subscriber_apa_id, _subscribed_to_av_id) in workspace_snapshot .edges_directed(component_root_attribute_value_id, Direction::Incoming) .await? { if let EdgeWeightKind::ValueSubscription(path) = edge.kind { let subscription = ValueSubscription { attribute_value_id: component_root_attribute_value_id, path, }; if let Some(resolved_av_id) = subscription.resolve(ctx).await? { if let Some(subscriber_ap_id) = workspace_snapshot .source_opt(subscriber_apa_id, EdgeWeightKind::PrototypeArgument) .await? { for subscriber_av_ulid in workspace_snapshot .incoming_sources_for_edge_weight_kind( subscriber_ap_id, EdgeWeightKindDiscriminants::Prototype, ) .await? { let subscriber_av_id = subscriber_av_ulid.into(); let subscriber_component_id = self.component_id_for_av(ctx, subscriber_av_id).await?; let subscriber_controlling_av_id = Self::get_controlling_attribute_value_id( ctx, subscriber_component_id, subscriber_av_id, controlling_funcs_for_component, ) .await?; let resolved_controlling_av_id = Self::get_controlling_attribute_value_id( ctx, current_component_id, resolved_av_id, controlling_funcs_for_component, ) .await?; new_work_queue_values .push(WorkQueueValue::Discovered(subscriber_av_id)); if subscriber_component_id != current_component_id && !Component::should_data_flow_between_components( ctx, subscriber_component_id, current_component_id, ) .await? { continue; } self.value_depends_on( DependentValue::AttributeValue(subscriber_controlling_av_id), DependentValue::AttributeValue(resolved_controlling_av_id), ); } } } } } Ok(new_work_queue_values) } /// Populate the [`DependentValueGraph`] using the provided [`values`](WorkQueueValue). This /// includes the entire parent tree of each value discovered, up to the root for every value's /// component, as well as any dependencies of values discovered while walking the graph /// (e.g. if a value's prototype takes one of the passed values as an input, we also need to /// find the values for the other inputs to the prototype, etc.). async fn populate_for_values( &mut self, ctx: &DalContext, values: Vec<WorkQueueValue>, ) -> AttributeValueResult<()> { let workspace_snapshot = ctx.workspace_snapshot()?; let mut controlling_funcs_for_component = HashMap::new(); let mut work_queue = VecDeque::from_iter(values); let mut seen_list = HashSet::new(); while let Some(current_attribute_value) = work_queue.pop_front() { if seen_list.contains(&current_attribute_value.id()) { continue; } seen_list.insert(current_attribute_value.id()); let current_component_id = self .component_id_for_av(ctx, current_attribute_value.id()) .await?; let root_attribute_value_id = self .root_attribute_value_for_component(ctx, current_component_id) .await?; // Gather up the leaf overlays (if any) for this component to see if // the current attribute value is an input to them, then add it to // the graph if so let overlay_leaves = self .leaf_overlays_for_component(ctx, current_component_id) .await?; for leaf in overlay_leaves.iter() { if leaf.inputs.contains(&current_attribute_value.id()) { let destination_map_value = DependentValue::OverlayDestination { leaf_prototype_id: leaf.prototype_id, destination_map_id: leaf.destination_map_id, destination_element_id: leaf.destination_element_id, root_attribute_value_id, }; self.value_depends_on( destination_map_value, DependentValue::AttributeValue(current_attribute_value.id()), ); } } if current_attribute_value.id() == root_attribute_value_id { let new_work_queue_values = self .process_subscription_dependencies( ctx, current_attribute_value.id(), &workspace_snapshot, &mut controlling_funcs_for_component, ) .await?; work_queue.extend(new_work_queue_values); } // We need to be sure to only construct the graph out of // "controlling" values. However, controlled values can still be // inputs to functions, so we need to find the prototypes that // depend on them! let current_attribute_value_controlling_value_id = Self::get_controlling_attribute_value_id( ctx, current_component_id, current_attribute_value.id(), &mut controlling_funcs_for_component, ) .await?; let value_is_for = AttributeValue::is_for(ctx, current_attribute_value.id()).await?; // Gather the Attribute Prototype Arguments that take the thing the // current value is for (prop, or socket) as an input let relevant_apas = { let attribute_prototype_argument_idxs = workspace_snapshot .incoming_sources_for_edge_weight_kind( value_is_for, EdgeWeightKindDiscriminants::PrototypeArgumentValue, ) .await?; let mut relevant_apas = vec![]; for apa_idx in attribute_prototype_argument_idxs { let apa = workspace_snapshot .get_node_weight(apa_idx) .await? .get_attribute_prototype_argument_node_weight()?; // If there are no targets, this is a schema-level attribute prototype argument // TODO (jkeiser) the above comment is false; there can be apas on component-specific // prototypes. Presumably we check this elsewhere. relevant_apas.push(apa); } relevant_apas }; if let ValueIsFor::Prop(prop_id) = value_is_for { let prop = Prop::get_by_id(ctx, prop_id).await?; if prop.kind == PropKind::Object { // The children of an object might themselves be the // input to another function, so we have to add them to // the calculation of the graph, as we encounter them. // We use `seen_list` to ensure we don't reprocess these // values or the parents of these values. for child_value_id in AttributeValue::get_child_av_ids_in_order(ctx, current_attribute_value.id()) .await? { if !seen_list.contains(&child_value_id) { work_queue.push_back(WorkQueueValue::ObjectChild(child_value_id)); } } } } // Find the values that are set by the prototype for the relevant // AttributePrototypeArguments, and declare that these values depend // on the value of the current value // // TODO: This code is very expensive, especially as the graph grows. for apa in relevant_apas { let prototype_id = AttributePrototypeArgument::prototype_id(ctx, apa.id().into()).await?; let attribute_value_ids = AttributePrototype::attribute_value_ids(ctx, prototype_id).await?; for attribute_value_id in attribute_value_ids { let component_id = self.component_id_for_av(ctx, attribute_value_id).await?; if component_id != current_component_id { continue; } // If the input to this function is a value that is a child of another dynamic // function, we should just depend on the controlling value in the dependency // graph, since we can't guarantee that the controlled value won't be destroyed // when "populating" nested value let controlling_attribute_value_id = Self::get_controlling_attribute_value_id( ctx, component_id, attribute_value_id, &mut controlling_funcs_for_component, ) .await?; work_queue .push_back(WorkQueueValue::Discovered(controlling_attribute_value_id)); self.value_depends_on( DependentValue::AttributeValue(controlling_attribute_value_id), DependentValue::AttributeValue( current_attribute_value_controlling_value_id, ), ); } } // We have to be sure to process the parent of every value we // discover for any dependencies might have since changes to // children mean changes to parents. Suppose for example an // output socket depends on the domain prop, we have to be sure // we add that output socket to the graph if a child of domain // changes, because if a child of a value has changed, then the // view of that value to the leaves will also have changed. if let Some(parent_attribute_value_id) = AttributeValue::parent_id(ctx, current_attribute_value_controlling_value_id).await? { work_queue.push_back(WorkQueueValue::Discovered(parent_attribute_value_id)); } } // Parent props always depend on their children. // Walking to the root for every value discovered // in the loop above ensures we construct a single // DVU connected graph for every tree of dependencies. let all_ids: Vec<_> = self.inner.all_ids().collect(); for id in all_ids { let mut cursor = id; while let Some(parent_av_id) = AttributeValue::parent_id(ctx, cursor.attribute_value_id()).await? { let parent_value = DependentValue::AttributeValue(parent_av_id); self.value_depends_on(parent_value, cursor); cursor = parent_value; } } Ok(()) } async fn leaf_overlays_for_component( &mut self, ctx: &DalContext, component_id: ComponentId, ) -> AttributeValueResult<Arc<Vec<ComponentLeaf>>> { if let Some(existing) = self.leaf_overlays_for_component.get(&component_id) { return Ok(existing.clone()); } let schema_id = self.schema_id_for_component(ctx, component_id).await?; let root_attribute_value_id = self .root_attribute_value_for_component(ctx, component_id) .await?; let prototypes = self.leaf_prototypes_for_schema_id(ctx, schema_id).await?; let mut component_leaves = vec![]; for (prototype, func_name) in prototypes.as_ref() { let inputs = prototype .resolve_inputs(ctx, root_attribute_value_id) .await .map_err(Box::new)?; let destination_map_id = prototype .resolve_output_map(ctx, root_attribute_value_id) .await .map_err(Box::new)?; let destination_element_id = AttributeValue::map_child_opt(ctx, destination_map_id, func_name).await?; let leaf = ComponentLeaf { destination_map_id, destination_element_id, inputs, prototype_id: prototype.id(), }; if let Some(destination_element_id) = destination_element_id { self.leaf_element_ids .insert(destination_element_id, leaf.clone()); } component_leaves.push(leaf); } let arcvec = Arc::new(component_leaves); self.leaf_overlays_for_component .insert(component_id, arcvec.clone()); Ok(arcvec) } async fn leaf_prototypes_for_schema_id( &mut self, ctx: &DalContext, schema_id: SchemaId, ) -> AttributeValueResult<Arc<Vec<(LeafPrototype, String)>>> { Ok(match self.leaf_prototypes_for_schema.entry(schema_id) { btree_map::Entry::Vacant(vacant_entry) => { let mut prototypes_and_func_names = vec![]; let prototypes = LeafPrototype::for_schema(ctx, schema_id) .await .map_err(Box::new)?; for proto in prototypes { let func_id = LeafPrototype::func_id(ctx, proto.id()) .await .map_err(Box::new)?; let func = Func::get_by_id(ctx, func_id).await?; prototypes_and_func_names.push((proto, func.name)); } let arcvec = Arc::new(prototypes_and_func_names); vacant_entry.insert(arcvec.clone()); arcvec } btree_map::Entry::Occupied(occupied_entry) => occupied_entry.get().clone(), }) } async fn schema_id_for_component( &mut self, ctx: &DalContext, component_id: ComponentId, ) -> AttributeValueResult<SchemaId> { Ok(match self.schema_id_for_component.entry(component_id) { btree_map::Entry::Vacant(vacant_entry) => { let schema_id = Component::schema_id_for_component_id(ctx, component_id).await?; vacant_entry.insert(schema_id); schema_id } btree_map::Entry::Occupied(occupied_entry) => *occupied_entry.get(), }) } async fn root_attribute_value_for_component( &mut self, ctx: &DalContext, component_id: ComponentId, ) -> AttributeValueResult<AttributeValueId> { Ok( match self.root_attribute_values_for_component.entry(component_id) { btree_map::Entry::Vacant(vacant_entry) => { let root_av_id = Component::root_attribute_value_id(ctx, component_id).await?; vacant_entry.insert(root_av_id); root_av_id } btree_map::Entry::Occupied(occupied_entry) => *occupied_entry.get(), }, ) } async fn component_id_for_av( &mut self, ctx: &DalContext, value_id: AttributeValueId, ) -> AttributeValueResult<ComponentId> { Ok(match self.component_id_for_av.entry(value_id) { btree_map::Entry::Vacant(vacant_entry) => { let component_id = AttributeValue::component_id(ctx, value_id).await?; vacant_entry.insert(component_id); component_id } btree_map::Entry::Occupied(occupied_entry) => *occupied_entry.get(), }) } /// Return the cached record of the component id that this value belongs to. /// Once a dependent value graph has been constructed, this will always /// return `Some(_)` if the value has been taken from this graph. pub fn cached_component_id_for_value(&self, value: DependentValue) -> Option<ComponentId> { let attribute_value_id = value.attribute_value_id(); self.component_id_for_av.get(&attribute_value_id).copied() } #[allow(clippy::disallowed_methods)] pub async fn debug_dot(&self, ctx: &DalContext, suffix: Option<&str>) { let mut is_for_map = BTreeMap::new(); let mut component_name_map = BTreeMap::new(); for dependent_value in self.inner.id_to_index_map().keys() { let av_id = dependent_value.attribute_value_id(); let component_id = AttributeValue::component_id(ctx, av_id) .await .expect("get component id for av"); let component = Component::get_by_id(ctx, component_id) .await .expect("get component"); component_name_map.insert( *dependent_value, component .name(ctx) .await .expect("able to get component name"), ); let is_for = AttributeValue::is_for(ctx, av_id) .await .expect("able to get value is for") .debug_info(ctx) .await .expect("able to get info for value is for"); is_for_map.insert(*dependent_value, is_for); } let label_value_fn = move |_: &StableDiGraph<DependentValue, ()>, (_, dependent_value): (NodeIndex, &DependentValue)| { let dependent_value = *dependent_value; let is_for = is_for_map.clone(); let is_for_string = is_for .clone() .get(&dependent_value) .map(ToOwned::to_owned) .expect("is for exists for every value"); let component_name = component_name_map .get(&dependent_value) .map(ToOwned::to_owned) .expect("component name exists for every value"); let dep_value_string = match dependent_value { DependentValue::AttributeValue(attribute_value_id) => { attribute_value_id.to_string() } DependentValue::OverlayDestination { destination_map_id, destination_element_id, .. } => format!( "leaf({})", destination_element_id.unwrap_or(destination_map_id), ), }; format!("label = \"{component_name}\n{dep_value_string}\n{is_for_string}\"") }; let dot = petgraph::dot::Dot::with_attr_getters( self.inner.graph(), &[ petgraph::dot::Config::NodeNoLabel, petgraph::dot::Config::EdgeNoLabel, ], &|_, _| "label = \"\"".to_string(), &label_value_fn, ); let filename = format!("{}-{}.txt", Ulid::new(), suffix.unwrap_or("depgraph")); let home_env = std::env::var("HOME").expect("No HOME environment variable set"); let home = std::path::Path::new(&home_env); let mut file = File::create(home.join(&filename)).expect("could not create file"); file.write_all(format!("{dot:?}").as_bytes()) .expect("could not write file"); println!("dot output stored in file (filename without extension: {filename})"); } async fn get_controlling_attribute_value_id( ctx: &DalContext, current_component_id: ComponentId, current_attribute_value_id: AttributeValueId, controlling_funcs_for_component: &mut HashMap< ComponentId, HashMap<AttributeValueId, ControllingFuncData>, >, ) -> AttributeValueResult<AttributeValueId> { Ok( match controlling_funcs_for_component.entry(current_component_id) { Entry::Vacant(entry) => { let controlling_func_data = Component::list_av_controlling_func_ids_for_id(ctx, current_component_id) .await .map_err(Box::new)?; let data = controlling_func_data .get(&current_attribute_value_id) .copied(); entry.insert(controlling_func_data); data } Entry::Occupied(entry) => entry.get().get(&current_attribute_value_id).copied(), } .map(|func_data| func_data.av_id) // If nothing controls us, we control ourselves .unwrap_or(current_attribute_value_id), ) } pub fn value_depends_on(&mut self, value: DependentValue, depends_on: DependentValue) { self.inner.id_depends_on(value, depends_on); } pub fn contains_value(&self, value_id: DependentValue) -> bool { self.inner.contains_id(value_id) } pub fn direct_dependencies_of(&self, value_id: DependentValue) -> Vec<DependentValue> { self.inner.direct_dependencies_of(value_id) } pub fn remove_value(&mut self, value: DependentValue) { self.inner.remove_id(value); } pub fn cycle_on_self(&mut self, value_id: DependentValue) { self.inner.cycle_on_self(value_id); } pub fn independent_values(&self) -> Vec<DependentValue> { self.inner.independent_ids() } pub fn all_value_ids(&self) -> impl Iterator<Item = DependentValue> { self.inner.all_ids() } /// Indicates whether the value needs to be processed. This is useful for determining when to /// filter or de-duplicate values when executing from their prototype functions. If the value is /// marked as needing to be processed, it likely needs to execute from its prototype function. pub fn values_needs_to_execute_from_prototype_function( &self, value_id: DependentValue, ) -> bool { self.values_that_need_to_execute_from_prototype_function .contains(&value_id) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server