Skip to main content
Glama
subgraph.rs27.3 kB
use std::{ collections::{ BTreeMap, HashMap, HashSet, }, io::Write, time::Instant, }; use petgraph::prelude::*; use serde::{ Deserialize, Serialize, }; use si_events::merkle_tree_hash::MerkleTreeHash; use telemetry::prelude::*; use crate::{ CustomEdgeWeight, CustomNodeWeight, EdgeKind, SplitGraphEdgeWeight, SplitGraphEdgeWeightKind, SplitGraphError, SplitGraphNodeId, SplitGraphNodeWeight, SplitGraphResult, updates::ExternalSourceData, }; pub type SubGraphNodeIndex = NodeIndex<usize>; pub type SubGraphEdgeIndex = EdgeIndex<usize>; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct SubGraph<N, E, K> where N: CustomNodeWeight, E: CustomEdgeWeight<K>, K: EdgeKind, { pub(crate) graph: StableDiGraph<SplitGraphNodeWeight<N>, SplitGraphEdgeWeight<E, K, N>, usize>, pub(crate) node_index_by_id: BTreeMap<SplitGraphNodeId, SubGraphNodeIndex>, pub(crate) node_indexes_by_lineage_id: HashMap<SplitGraphNodeId, HashSet<SubGraphNodeIndex>>, pub(crate) root_index: SubGraphNodeIndex, #[serde(skip)] pub(crate) touched_nodes: HashSet<SubGraphNodeIndex>, } impl<N, E, K> Default for SubGraph<N, E, K> where N: CustomNodeWeight, E: CustomEdgeWeight<K>, K: EdgeKind, { fn default() -> Self { Self::new() } } impl<N, E, K> SubGraph<N, E, K> where N: CustomNodeWeight, E: CustomEdgeWeight<K>, K: EdgeKind, { pub(crate) fn new() -> Self { Self { graph: StableDiGraph::with_capacity(32768, 32768 * 2), node_index_by_id: BTreeMap::new(), node_indexes_by_lineage_id: HashMap::new(), root_index: NodeIndex::new(0), touched_nodes: HashSet::new(), } } pub fn graph( &self, ) -> &StableDiGraph<SplitGraphNodeWeight<N>, SplitGraphEdgeWeight<E, K, N>, usize> { &self.graph } pub fn root_id(&self) -> Option<SplitGraphNodeId> { self.graph .node_weight(self.root_index) .map(|node| node.id()) } pub(crate) fn new_with_root() -> Self { let mut subgraph = Self { graph: StableDiGraph::with_capacity(32768, 32768 * 2), node_index_by_id: BTreeMap::new(), node_indexes_by_lineage_id: HashMap::new(), root_index: NodeIndex::new(0), touched_nodes: HashSet::new(), }; let root_id = SplitGraphNodeId::new(); let root_index = subgraph.graph.add_node(SplitGraphNodeWeight::SubGraphRoot { id: root_id, merkle_tree_hash: MerkleTreeHash::nil(), }); subgraph.node_index_by_id.insert(root_id, root_index); subgraph.root_index = root_index; subgraph } /// Remove any nodes with no incoming edges from the graph, returning the ids of removed nodes. /// Note that this does not automatically cascade to remove nodes that were orphaned by the /// removal of the first "layer" of orphaned nodes. This function is intended to be used /// by the `SiSplitGraph::cleanup` call to remove orphaned nodes along with ExternalSource /// edges in *other* subgraphs that might point to the orphaned nodes. /// That method will call this in a loop until no orphaned nodes remain. pub(crate) fn remove_externals(&mut self) -> Vec<SplitGraphNodeId> { let mut removed_ids = vec![]; let mut indexes_to_remove = vec![]; for external in self .graph .externals(Incoming) .filter(|idx| *idx != self.root_index) { if let Some(node_id) = self.graph.node_weight(external).map(|node| node.id()) { removed_ids.push(node_id); } indexes_to_remove.push(external); } for index in indexes_to_remove { self.graph.remove_node(index); } removed_ids } pub fn node_weight(&self, node_id: SplitGraphNodeId) -> Option<&SplitGraphNodeWeight<N>> { self.node_id_to_index(node_id) .and_then(|index| self.graph.node_weight(index)) } pub fn cleanup_maps(&mut self) { self.node_index_by_id .retain(|_, index| self.graph.node_weight(*index).is_some()); self.node_indexes_by_lineage_id .iter_mut() .for_each(|(_, node_indexes)| { node_indexes.retain(|index| self.graph.node_weight(*index).is_some()); }); self.node_indexes_by_lineage_id .retain(|_, indexes| !indexes.is_empty()); } pub(crate) fn add_ids_to_indexes( &mut self, node_id: SplitGraphNodeId, lineage_id: SplitGraphNodeId, node_index: SubGraphNodeIndex, ) { self.node_index_by_id.insert(node_id, node_index); self.node_indexes_by_lineage_id .entry(lineage_id) .and_modify(|set| { set.insert(node_index); }) .or_insert(HashSet::from([node_index])); } pub(crate) fn remove_ids_from_indexes( &mut self, node_id: SplitGraphNodeId, lineage_id: SplitGraphNodeId, ) { // println!("removing {:?} from indexes", node_id); let node_index = self.node_index_by_id.remove(&node_id); if let Some(node_index) = node_index { if let Some(lineage_indexes) = self.node_indexes_by_lineage_id.get_mut(&lineage_id) { lineage_indexes.retain(|idx| *idx != node_index); } } } pub(crate) fn add_node(&mut self, node: SplitGraphNodeWeight<N>) -> SubGraphNodeIndex { let node_id = node.id(); let lineage_id = node.lineage_id(); let node_index = self.graph.add_node(node); self.add_ids_to_indexes(node_id, lineage_id, node_index); self.touched_nodes.insert(node_index); node_index } pub(crate) fn replace_node( &mut self, index: SubGraphNodeIndex, node: SplitGraphNodeWeight<N>, ) -> Option<SplitGraphNodeId> { let new_node_id = node.id(); let new_lineage_id = node.lineage_id(); let previous_ids = match self.graph.node_weight_mut(index) { Some(existing_node_ref) => { let node_id = existing_node_ref.id(); let lineage_id = existing_node_ref.id(); *existing_node_ref = node; Some((node_id, lineage_id)) } None => None, }; if let Some((previous_node_id, previous_lineage_id)) = previous_ids { if previous_node_id != new_node_id { self.remove_ids_from_indexes(previous_node_id, previous_lineage_id); self.add_ids_to_indexes(new_node_id, new_lineage_id, index); } } self.touched_nodes.insert(index); previous_ids.map(|(node_id, _)| node_id) } fn edge_exists( &self, from_index: SubGraphNodeIndex, edge_weight: &SplitGraphEdgeWeight<E, K, N>, to_index: SubGraphNodeIndex, ) -> bool { self.graph .edges_connecting(from_index, to_index) .any(|edge_ref| match edge_ref.weight() { SplitGraphEdgeWeight::Custom(custom_edge) => { Some(custom_edge.kind()) == edge_weight.custom().map(|e| e.kind()) } SplitGraphEdgeWeight::ExternalSource { source_id, edge_kind, .. } => match &edge_weight { SplitGraphEdgeWeight::ExternalSource { source_id: new_source_id, edge_kind: new_edge_kind, .. } => source_id == new_source_id && edge_kind == new_edge_kind, _ => false, }, SplitGraphEdgeWeight::Ordering => { matches!(edge_weight, SplitGraphEdgeWeight::Ordering) } SplitGraphEdgeWeight::Ordinal => { matches!(edge_weight, SplitGraphEdgeWeight::Ordinal) } }) } pub(crate) fn ordering_node_for_node_index( &self, node_index: SubGraphNodeIndex, ) -> Option<SubGraphNodeIndex> { let ordering_node_index = self .graph .edges_directed(node_index, Outgoing) .find(|edge_ref| matches!(edge_ref.weight(), SplitGraphEdgeWeight::Ordering)) .map(|edge_ref| edge_ref.target())?; if let SplitGraphNodeWeight::Ordering { .. } = self.graph.node_weight(ordering_node_index)? { Some(ordering_node_index) } else { None } } pub(crate) fn reorder_node<L>( &mut self, node_index: SubGraphNodeIndex, lambda: L, ) -> SplitGraphResult<()> where L: FnOnce(&[SplitGraphNodeId]) -> Vec<SplitGraphNodeId>, { let Some(ordering_node_index) = self.ordering_node_for_node_index(node_index) else { return Ok(()); }; let Some(SplitGraphNodeWeight::Ordering { order, .. }) = self.graph.node_weight_mut(ordering_node_index) else { return Ok(()); }; let new_order = lambda(order.as_slice()); // Validate the return here to prevent a panic in copy from slice, and to prevent removal of ordered children if new_order.len() != order.len() { return Err(SplitGraphError::OrderLengthMismatch); } for id in order.iter() { if !new_order.contains(id) { return Err(SplitGraphError::OrderContentMismatch); } } order.copy_from_slice(new_order.as_slice()); self.touch_node(ordering_node_index); Ok(()) } pub(crate) fn ordered_children( &self, node_index: SubGraphNodeIndex, ) -> Option<Vec<SubGraphNodeIndex>> { let ordering_node_index = self.ordering_node_for_node_index(node_index)?; let SplitGraphNodeWeight::Ordering { order, .. } = self.graph.node_weight(ordering_node_index)? else { return None; }; Some( order .iter() .filter_map(|id| self.node_index_by_id.get(id).copied()) .collect(), ) } pub fn root_node_merkle_tree_hash(&self) -> MerkleTreeHash { self.graph .node_weight(self.root_index) .map(|node| node.merkle_tree_hash()) .unwrap_or(MerkleTreeHash::nil()) } pub(crate) fn recalculate_entire_merkle_tree_hash(&mut self) { let mut dfs = petgraph::visit::DfsPostOrder::new(&self.graph, self.root_index); while let Some(node_index) = dfs.next(&self.graph) { if let Some((hash, _)) = self.calculate_merkle_hash_for_node(node_index) { if let Some(node_weight_mut) = self.graph.node_weight_mut(node_index) { node_weight_mut.set_merkle_tree_hash(hash); } } } } pub(crate) fn recalculate_merkle_tree_hash_based_on_touched_nodes(&mut self) -> (usize, usize) { let mut node_count = 0; let mut edge_count = 0; let big_start = Instant::now(); let mut dfs = petgraph::visit::DfsPostOrder::new(&self.graph, self.root_index); let mut discovered_nodes = HashSet::new(); if self.touched_nodes.is_empty() { warn!( "touched nodes empty. merkle tree hash calculated in {:?}", big_start.elapsed() ); return (0, 0); } while let Some(node_index) = dfs.next(&self.graph) { node_count += 1; if self.touched_nodes.contains(&node_index) || discovered_nodes.contains(&node_index) { if let Some((hash, edges_hashed)) = self.calculate_merkle_hash_for_node(node_index) { edge_count += edges_hashed; if let Some(node_weight_mut) = self.graph.node_weight_mut(node_index) { node_weight_mut.set_merkle_tree_hash(hash); } } for incoming_source_idx in self.graph.neighbors_directed(node_index, Incoming) { discovered_nodes.insert(incoming_source_idx); } } } self.touched_nodes.clear(); warn!("merkle tree hash calculated in {:?}", big_start.elapsed()); (node_count, edge_count) } pub(crate) fn all_outgoing_stably_ordered( &self, node_index: SubGraphNodeIndex, ) -> Vec<(&SplitGraphEdgeWeight<E, K, N>, SubGraphNodeIndex)> { let ordered_children = self.ordered_children(node_index).unwrap_or_default(); let ordered_children_edges: Vec<(_, _)> = ordered_children .iter() .flat_map(|child_index| { self.graph .edges_connecting(node_index, *child_index) .map(|edge_ref| (edge_ref.weight(), *child_index)) }) .collect(); let mut unordered_children: Vec<(_, _, _)> = self .graph .edges_directed(node_index, Outgoing) .filter(|edge_ref| !ordered_children.contains(&edge_ref.target())) .filter_map(|edge_ref| { self.graph .node_weight(edge_ref.target()) .map(|weight| (weight.id(), edge_ref.weight(), edge_ref.target())) }) .collect(); // We want to keep the "unordered" children stably sorted as well, // so that we get the same hash every time if there are no changes unordered_children.sort_by_cached_key(|(id, _, _)| *id); let mut all_children = Vec::with_capacity(ordered_children.len() + unordered_children.len()); all_children.extend(ordered_children_edges); all_children.extend( unordered_children .into_iter() .map(|(_, weight, index)| (weight, index)), ); all_children } fn calculate_merkle_hash_for_node( &self, node_index: SubGraphNodeIndex, ) -> Option<(MerkleTreeHash, usize)> { let mut edge_count = 0; let mut hasher = MerkleTreeHash::hasher(); let node_weight = self.graph.node_weight(node_index)?; hasher.update(node_weight.node_hash().as_bytes()); hasher.update(&node_weight.id().inner().to_bytes()); let all_outgoing_stably_ordered = self.all_outgoing_stably_ordered(node_index); let mut hashed_children = HashSet::new(); for (edge_weight, child_idx) in all_outgoing_stably_ordered { edge_count += 1; if !hashed_children.contains(&child_idx) { hasher.update( self.graph .node_weight(child_idx)? .merkle_tree_hash() .as_bytes(), ); hashed_children.insert(child_idx); } if let Some(edge_entropy) = edge_weight.edge_entropy() { hasher.update(edge_entropy.as_slice()); } } Some((hasher.finalize(), edge_count)) } pub(crate) fn node_id_to_index(&self, id: SplitGraphNodeId) -> Option<SubGraphNodeIndex> { self.node_index_by_id.get(&id).copied() } /// Adds a SplitGraphEdgeWeight if one of the exact same kind does not exist between `from_index` /// and `to_index` and touches `from_index` so that the merkle tree hash will be recalculated. pub(crate) fn add_edge_raw( &mut self, from_index: SubGraphNodeIndex, edge_weight: SplitGraphEdgeWeight<E, K, N>, to_index: SubGraphNodeIndex, ) -> Option<SubGraphEdgeIndex> { if !self.edge_exists(from_index, &edge_weight, to_index) { self.touch_node(from_index); Some(self.graph.add_edge(from_index, to_index, edge_weight)) } else { None } } pub(crate) fn remove_node(&mut self, node_index: SubGraphNodeIndex) { let Some((node_id, lineage_id)) = self .graph .node_weight(node_index) .map(|n| (n.id(), n.lineage_id())) else { return; }; let parents: Vec<_> = self .graph .neighbors_directed(node_index, Incoming) .collect(); self.graph.remove_node(node_index); self.remove_ids_from_indexes(node_id, lineage_id); parents .into_iter() .for_each(|parent_idx| self.touch_node(parent_idx)); } pub(crate) fn add_or_get_ordering_node_for_node_index( &mut self, node_index: SubGraphNodeIndex, ) -> SubGraphNodeIndex { match self.ordering_node_for_node_index(node_index) { Some(existing_ordering_node_index) => existing_ordering_node_index, None => { let new_ordering_node_id = SplitGraphNodeId::new(); let ordering_node_index = self.graph.add_node(SplitGraphNodeWeight::Ordering { id: new_ordering_node_id, order: vec![], merkle_tree_hash: MerkleTreeHash::nil(), }); self.node_index_by_id .insert(new_ordering_node_id, ordering_node_index); self.add_edge_raw( node_index, SplitGraphEdgeWeight::Ordering, ordering_node_index, ); self.touch_node(node_index); self.touch_node(ordering_node_index); ordering_node_index } } } pub(crate) fn add_ordered_edge( &mut self, from_index: SubGraphNodeIndex, edge_weight: SplitGraphEdgeWeight<E, K, N>, to_index: SubGraphNodeIndex, ) -> SplitGraphResult<(Option<SubGraphEdgeIndex>, Option<SubGraphEdgeIndex>)> { let target_id = self .graph .node_weight(to_index) .map(|n| n.id()) .ok_or(SplitGraphError::NodeNotFoundAtIndex)?; let ordering_node_index = self.add_or_get_ordering_node_for_node_index(from_index); let mut ordinal_edge = None; if let Some(SplitGraphNodeWeight::Ordering { order, .. }) = self.graph.node_weight_mut(ordering_node_index) { if !order.contains(&target_id) { order.push(target_id); } ordinal_edge = self.add_edge_raw(ordering_node_index, SplitGraphEdgeWeight::Ordinal, to_index); } let edge = self.add_edge_raw(from_index, edge_weight, to_index); Ok((edge, ordinal_edge)) } /// Add an edge between `from_index` and `to_index` if the edge does not exist. pub(crate) fn add_edge( &mut self, from_index: SubGraphNodeIndex, edge_weight: SplitGraphEdgeWeight<E, K, N>, to_index: SubGraphNodeIndex, ) -> Option<SubGraphEdgeIndex> { self.add_edge_raw(from_index, edge_weight, to_index) } pub(crate) fn touch_node(&mut self, node_index: SubGraphNodeIndex) { self.touched_nodes.insert(node_index); } pub(crate) fn update_external_target_ids( &mut self, from_index: SubGraphNodeIndex, old_target_id: SplitGraphNodeId, new_target_id: SplitGraphNodeId, ) { self.touch_node(from_index); let external_target_node_indexes: Vec<_> = self .graph .neighbors_directed(from_index, Outgoing) .filter(|neighbor_index| { self.graph .node_weight(*neighbor_index) .is_some_and(|weight| weight.external_target_id() == Some(old_target_id)) }) .collect(); for neighbor_index in external_target_node_indexes { if let Some(SplitGraphNodeWeight::ExternalTarget { target, .. }) = self.graph.node_weight_mut(neighbor_index) { *target = new_target_id; } self.touch_node(neighbor_index); } } pub(crate) fn remove_external_source_edge( &mut self, from_index: SubGraphNodeIndex, to_index: SubGraphNodeIndex, external_source_data: ExternalSourceData<K, N>, ) { self.touch_node(from_index); let edge_indexes: Vec<_> = self .graph .edges_connecting(from_index, to_index) .filter(|edge_ref| { edge_ref.weight().external_source_data().as_ref() == Some(&external_source_data) }) .map(|edge_ref| edge_ref.id()) .collect(); for edge_index in edge_indexes { self.graph.remove_edge(edge_index); } } /// Removes all edges between `from_index` and `to_index` that match the passed in kind. /// Also handles removing any correspond pub(crate) fn remove_edge_raw( &mut self, from_index: SubGraphNodeIndex, kind: SplitGraphEdgeWeightKind<K>, to_index: SubGraphNodeIndex, ) { self.touch_node(from_index); let edge_indexes: Vec<_> = self .graph .edges_directed(from_index, Outgoing) .filter(|edge_ref| kind == edge_ref.weight().into() && edge_ref.target() == to_index) .map(|edge_ref| edge_ref.id()) .collect(); for edge_index in edge_indexes { self.graph.remove_edge(edge_index); } } pub(crate) fn remove_from_order( &mut self, ordering_node_index: SubGraphNodeIndex, item_id: SplitGraphNodeId, ) { if let Some(SplitGraphNodeWeight::Ordering { order, .. }) = self.graph.node_weight_mut(ordering_node_index) { order.retain(|id| *id != item_id); } } /// Removes the edge specified by `edge_index`. Also handles edges to and /// from the ordering node, if one exists for `from_index`, and removes /// the target from the order. pub(crate) fn remove_edge_by_index(&mut self, edge_index: EdgeIndex<usize>) { if let Some((from_index, to_index)) = self.graph.edge_endpoints(edge_index) { self.touch_node(from_index); let target_id = self.graph.node_weight(to_index).map(|n| n.id()).unwrap(); if let Some(ordering_node_index) = self .graph .edges_directed(from_index, Outgoing) .find(|edge_ref| matches!(edge_ref.weight(), SplitGraphEdgeWeight::Ordering)) .map(|edge_ref| edge_ref.target()) { self.touch_node(ordering_node_index); if let Some(ordinal_edge_index) = self .graph .edges_directed(ordering_node_index, Outgoing) .find(|edge_ref| { matches!(edge_ref.weight(), SplitGraphEdgeWeight::Ordinal) && self.graph.node_weight(edge_ref.target()).map(|n| n.id()) == Some(target_id) }) .map(|edge_ref| edge_ref.id()) { self.graph.remove_edge(ordinal_edge_index); self.remove_from_order(ordering_node_index, target_id); } } self.graph.remove_edge(edge_index); } } pub(crate) fn nodes(&self) -> impl Iterator<Item = &SplitGraphNodeWeight<N>> { self.graph .node_indices() .filter_map(|node_index| self.graph.node_weight(node_index)) } pub(crate) fn edges( &self, ) -> impl Iterator< Item = ( &SplitGraphEdgeWeight<E, K, N>, SplitGraphNodeId, SplitGraphNodeId, ), > { self.graph.edge_indices().filter_map(|edge_index| { self.graph.edge_weight(edge_index).and_then(|edge_weight| { self.graph .edge_endpoints(edge_index) .and_then(|(source_idx, target_idx)| { self.graph .node_weight(source_idx) .zip(self.graph.node_weight(target_idx)) .map(|(source, target)| (edge_weight, source.id(), target.id())) }) }) }) } pub(crate) fn tiny_dot_to_file(&self, name: &str) { let dot = petgraph::dot::Dot::with_attr_getters( &self.graph, &[ petgraph::dot::Config::NodeNoLabel, petgraph::dot::Config::EdgeNoLabel, ], &|_, edge_ref| { let (label, color) = match edge_ref.weight() { SplitGraphEdgeWeight::Custom(_) => ("".into(), "black"), SplitGraphEdgeWeight::ExternalSource { source_id, .. } => { (format!("external source: {source_id}\n"), "red") } SplitGraphEdgeWeight::Ordering => ("ordering".into(), "green"), SplitGraphEdgeWeight::Ordinal => ("ordinal".into(), "green"), }; format!("label = \"{label}\"\ncolor = {color}") }, &|_, (node_idx, node_weight)| { let (label, color) = match node_weight { SplitGraphNodeWeight::Custom(n) => { let node_dbg = n.dot_details(); ( format!("node: {} ({node_idx:?})\n{node_dbg}", n.id()), "black", ) } SplitGraphNodeWeight::ExternalTarget { target, .. } => { (format!("{node_idx:?}external target: {target}",), "red") } SplitGraphNodeWeight::GraphRoot { id, .. } => { (format!("graph root: {id} ({node_idx:?})"), "blue") } SplitGraphNodeWeight::SubGraphRoot { id, .. } => { (format!("subgraph root: {id} ({node_idx:?})"), "blue") } SplitGraphNodeWeight::Ordering { id, .. } => { (format!("ordering: {id} ({node_idx:?})"), "green") } }; format!("label = \"{label}\"\ncolor = {color}") }, ); #[allow(clippy::disallowed_methods)] let home_str = std::env::var("HOME").expect("could not find home directory via env"); let home = std::path::Path::new(&home_str); let mut file = std::fs::File::create(home.join(format!("{name}.txt"))).expect("could not create file"); file.write_all(format!("{dot:?}").as_bytes()) .expect("could not write file"); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server