Skip to main content
Glama
dependency_graph.rs13.1 kB
use std::collections::{ HashMap, HashSet, VecDeque, }; use itertools::Itertools; use petgraph::{ prelude::*, visit::{ Control, DfsEvent, }, }; use telemetry::prelude::*; use super::{ ActionError, ActionResult, prototype::{ ActionKind, ActionPrototype, }, }; use crate::{ AttributePrototype, AttributeValue, Component, ComponentId, DalContext, action::{ Action, ActionId, }, attribute::prototype::argument::AttributePrototypeArgument, dependency_graph::DependencyGraph, }; #[derive(Debug, Clone)] pub struct ActionDependencyGraph { inner: DependencyGraph<ActionId>, } impl Default for ActionDependencyGraph { fn default() -> Self { Self::new() } } impl ActionDependencyGraph { pub fn new() -> Self { Self { inner: DependencyGraph::new(), } } pub fn is_acyclic(&self) -> bool { petgraph::algo::toposort(self.inner.graph(), None).is_ok() } /// Construct an [`ActionDependencyGraph`] of all of the queued [`Action`s][crate::action::Action] /// for the current [`WorkspaceSnapshot`][crate::WorkspaceSnapshot]. #[instrument( level = "debug", name = "action.dependency_graph.for_workspace", skip(ctx) )] pub async fn for_workspace(ctx: &DalContext) -> ActionResult<Self> { // * Get all ActionId -> ComponentId mappings. // * For each of these ComponentIds (A): // * For each Input Socket: // * For each source ComponentId (B): // * All Actions for Component A depend on All actions for Component B let mut component_dependencies: StableDiGraph<ComponentId, ()> = StableDiGraph::new(); let mut component_dependencies_index_by_id: HashMap<ComponentId, NodeIndex> = HashMap::new(); // let mut component_dependencies: HashMap<ComponentId, HashSet<ComponentId>> = HashMap::new(); // let mut component_reverse_dependencies: HashMap<ComponentId, HashSet<ComponentId>> = // HashMap::new(); let mut actions_by_component_id: HashMap<ComponentId, HashSet<ActionId>> = HashMap::new(); let mut action_dependency_graph = Self::new(); let mut action_kinds: HashMap<ActionId, ActionKind> = HashMap::new(); // Need to get all actions that are still in the "queue", including those that have failed, // or are currently running. for action_id in Action::all_ids(ctx).await? { action_dependency_graph.inner.add_id(action_id); // Theoretically, we may have Actions at some point that aren't Component specific. if let Some(component_id) = Action::component_id(ctx, action_id).await? { actions_by_component_id .entry(component_id) .or_default() .insert(action_id); } let action_prototype_id = Action::prototype_id(ctx, action_id).await?; let action_prototype = ActionPrototype::get_by_id(ctx, action_prototype_id).await?; action_kinds.insert(action_id, action_prototype.kind); } // TODO: Account for explicitly defined dependencies between actions. These should be edges // directly between two Actions, but are not implemented yet. let mut components_to_process = VecDeque::from(actions_by_component_id.keys().copied().collect_vec()); // track which components we've already processed let mut seen_list: HashSet<ComponentId> = HashSet::from_iter(components_to_process.iter().copied()); // Action dependencies are primarily based on the data flow between Components. Things that // feed data into other things must have their actions run before the actions for the // things they are feeding data into. while let Some(component_id) = components_to_process.pop_front() { let component_index = component_dependencies_index_by_id .entry(component_id) .or_insert_with(|| component_dependencies.add_node(component_id)) .to_owned(); for (_, apa_id) in Component::subscribers(ctx, component_id).await? { let prototype_id = AttributePrototypeArgument::prototype_id(ctx, apa_id).await?; if let Some(av_id) = AttributePrototype::attribute_value_id(ctx, prototype_id).await? { let subscriber_id = AttributeValue::component_id(ctx, av_id).await?; // The edges of this graph go `subscribed_to_component (source) -> // subscriber_component (target)`, matching the flow of the data between // components. let subscriber_index = component_dependencies_index_by_id .entry(subscriber_id) .or_insert_with(|| component_dependencies.add_node(subscriber_id)) .to_owned(); component_dependencies.update_edge(component_index, subscriber_index, ()); if seen_list.insert(subscriber_id) { components_to_process.push_back(subscriber_id); } } } } // For each Component's Actions, mark them as depending on the Actions for the first dependency // found along each branch in the dependency graph as each Component's Actions need to be marked as // depending on the Actions that the Component itself has been determined to be depending on. for (component_id, action_ids) in &actions_by_component_id { if let Some(&component_index) = component_dependencies_index_by_id.get(component_id) { for &component_action_id in action_ids { let action_kind = action_kinds .get(&component_action_id) .copied() .ok_or(ActionError::UnableToGetKind(component_action_id))?; // Given a data flow between components of: // `Component A -> Component B` // // * `ActionKind::Destroy` for `Component A` would run _after_ `Actions` for // `Component A`. (A depends on the components from the `Outgoing` data flow // edges) // * For all other `ActionKind`, `Actions` for `Component B` would run _after_ // `Actions` for `Component A`. (`B` depends on the components from the // `Incoming` data flow edges.) let dependency_direction = match action_kind { ActionKind::Create | ActionKind::Manual | ActionKind::Refresh | ActionKind::Update => Incoming, ActionKind::Destroy => Outgoing, }; // Instead of iterating over immediate neighbors, perform a DFS to find the // first dependency (along each branch) that has an action. // Depending on the direction above, we reverse the component dependencies graph to traverse accordingly match dependency_direction { Outgoing => { petgraph::visit::depth_first_search( &component_dependencies, Some(component_index), |event| { Self::calculate_dependencies_dfs_event( event, &mut action_dependency_graph, &component_dependencies, &actions_by_component_id, component_index, component_action_id, ) }, ); } Incoming => { // For Incoming, reverse the view so that the DFS still follows outgoing edges. let reversed = petgraph::visit::Reversed(&component_dependencies); petgraph::visit::depth_first_search( &reversed, Some(component_index), |event| { Self::calculate_dependencies_dfs_event( event, &mut action_dependency_graph, &component_dependencies, &actions_by_component_id, component_index, component_action_id, ) }, ); } } } } } Ok(action_dependency_graph) } pub fn action_depends_on(&mut self, action_id: ActionId, depends_on_id: ActionId) { self.inner.id_depends_on(action_id, depends_on_id); } pub fn contains_value(&self, action_id: ActionId) -> bool { self.inner.contains_id(action_id) } /// gets what actions are directly dependent on a given action id /// ex: Create -> Update -> Delete /// graph.direct_dependencies_of(update.actionid) -> Create pub fn direct_dependencies_of(&self, action_id: ActionId) -> Vec<ActionId> { self.inner.direct_dependencies_of(action_id) } pub fn remove_action(&mut self, action_id: ActionId) { self.inner.remove_id(action_id); } pub fn cycle_on_self(&mut self, action_id: ActionId) { self.inner.cycle_on_self(action_id); } pub fn independent_actions(&self) -> Vec<ActionId> { self.inner.independent_ids() } pub fn remaining_actions(&self) -> Vec<ActionId> { self.inner.remaining_ids() } /// Gets all downstream dependencies for the provided ActionId. This includes the entire subgraph /// starting at ActionId. #[instrument(level = "debug", skip(self))] pub fn get_all_dependencies(&self, action_id: ActionId) -> Vec<ActionId> { let current_dependencies = self.inner.direct_reverse_dependencies_of(action_id); let mut all_dependencies = HashSet::new(); let mut work_queue = VecDeque::from(current_dependencies.clone()); while let Some(action) = work_queue.pop_front() { match all_dependencies.insert(action) { true => { let next = self.inner.direct_reverse_dependencies_of(action); work_queue.extend(next); } false => continue, } } all_dependencies.into_iter().collect_vec() } /// For each event, if we find a component with an action, add the dependency and prune (as we don't need to keep traversing) fn calculate_dependencies_dfs_event( event: DfsEvent<NodeIndex>, action_dependency_graph: &mut ActionDependencyGraph, component_dependencies: &StableDiGraph<ComponentId, ()>, actions_by_component_id: &HashMap<ComponentId, HashSet<ActionId>>, component_index: NodeIndex, component_action_id: ActionId, ) -> Control<()> { match event { petgraph::visit::DfsEvent::Discover(node, _) => { // Skip the starting node. if node == component_index { return petgraph::visit::Control::Continue; } // Look up the component ID from the original graph. else if let Some(dependency_component_id) = component_dependencies.node_weight(node) { if let Some(dependency_action_ids) = actions_by_component_id.get(dependency_component_id) { // Found the first dependency on this branch with an action. for &dependency_action_id in dependency_action_ids { action_dependency_graph .action_depends_on(component_action_id, dependency_action_id); } // Prune this branch; don't traverse any deeper. return petgraph::visit::Control::Prune; } } petgraph::visit::Control::Continue } _ => petgraph::visit::Control::Continue, } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server