Skip to main content
Glama
lib.rs74.9 kB
use std::{ collections::{ BTreeMap, HashSet, VecDeque, }, marker::PhantomData, time::Instant, }; use dashmap::DashMap; use opt_zip::OptZip; use petgraph::prelude::*; use petgraph_traits::{ RawSplitGraphEdges, SplitGraphEdgeReference, SplitGraphEdges, SplitGraphNeighbors, }; use serde::{ Deserialize, Serialize, de::DeserializeOwned, }; use si_events::{ ContentHash, merkle_tree_hash::MerkleTreeHash, workspace_snapshot::{ Change, EntityKind, }, }; use si_id::ulid::Ulid; use strum::EnumDiscriminants; use telemetry::prelude::*; use thiserror::Error; pub mod opt_zip; pub mod petgraph_traits; pub mod subgraph; pub mod subgraph_address; pub mod updates; pub use subgraph::{ SubGraph, SubGraphEdgeIndex, SubGraphNodeIndex, }; pub use subgraph_address::SubGraphAddress; use updates::ExternalSourceData; pub use updates::Update; #[derive(Error, Debug)] pub enum SplitGraphError { #[error("Node id {0} not found")] NodeNotFound(SplitGraphNodeId), #[error("Node at index not found, this is a bug")] NodeNotFoundAtIndex, #[error("The splitgraph root is missing")] RootNodeNotFound, #[error("reorder must contain all the same ids as the original")] OrderContentMismatch, #[error("reorder must be of the same length as original")] OrderLengthMismatch, #[error("too many edges of kind {2} {1:?} to/from {0}")] TooManyEdgesOfKind(SplitGraphNodeId, Direction, String), #[error("No subgraph at index: {0}")] SubGraphMissing(usize), #[error("error reading subgraph with address {0:?}: {1}")] SubGraphRead(SubGraphAddress, String), #[error("error writing subgraph: {0}")] SubGraphWrite(String), #[error("edge would create a graph cycle")] WouldCreateGraphCycle, } pub type SplitGraphResult<T> = Result<T, SplitGraphError>; pub type SplitGraphNodeId = Ulid; pub type SubGraphIndex = usize; #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, EnumDiscriminants)] #[strum_discriminants(derive(Hash, Serialize, Deserialize))] pub enum SplitGraphNodeWeight<N> where N: CustomNodeWeight, { /// The node weight kind provided by users of this crate Custom(N), /// A placeholder for an edge that points to a node in another subgraph ExternalTarget { id: SplitGraphNodeId, target: SplitGraphNodeId, merkle_tree_hash: MerkleTreeHash, target_kind: SplitGraphNodeWeightDiscriminants, target_custom_kind: Option<N::Kind>, }, /// The ordering node for an ordered container Ordering { id: SplitGraphNodeId, order: Vec<SplitGraphNodeId>, merkle_tree_hash: MerkleTreeHash, }, /// The root node for the entire graph GraphRoot { id: SplitGraphNodeId, merkle_tree_hash: MerkleTreeHash, }, /// The root node for a subgraph (besides the first subgraph, which has the GraphRoot root) SubGraphRoot { id: SplitGraphNodeId, merkle_tree_hash: MerkleTreeHash, }, } impl<N> SplitGraphNodeWeight<N> where N: CustomNodeWeight, { pub fn id(&self) -> SplitGraphNodeId { match self { SplitGraphNodeWeight::Custom(n) => n.id(), SplitGraphNodeWeight::ExternalTarget { id, .. } | SplitGraphNodeWeight::Ordering { id, .. } | SplitGraphNodeWeight::GraphRoot { id, .. } | SplitGraphNodeWeight::SubGraphRoot { id, .. } => *id, } } pub fn set_id(&mut self, new_id: SplitGraphNodeId) { match self { SplitGraphNodeWeight::Custom(n) => n.set_id(new_id), SplitGraphNodeWeight::ExternalTarget { id, .. } | SplitGraphNodeWeight::Ordering { id, .. } | SplitGraphNodeWeight::GraphRoot { id, .. } | SplitGraphNodeWeight::SubGraphRoot { id, .. } => *id = new_id, } } pub fn lineage_id(&self) -> SplitGraphNodeId { match self { SplitGraphNodeWeight::Custom(n) => n.lineage_id(), other => other.id(), } } pub fn set_lineage_id(&mut self, new_lineage_id: SplitGraphNodeId) { if let SplitGraphNodeWeight::Custom(n) = self { n.set_lineage_id(new_lineage_id); } } pub fn entity_kind(&self) -> EntityKind { match self { SplitGraphNodeWeight::Custom(c) => c.entity_kind(), SplitGraphNodeWeight::ExternalTarget { .. } => EntityKind::ExternalTarget, SplitGraphNodeWeight::Ordering { .. } => EntityKind::Ordering, SplitGraphNodeWeight::GraphRoot { .. } => EntityKind::Root, SplitGraphNodeWeight::SubGraphRoot { .. } => EntityKind::SubGraphRoot, } } pub fn set_merkle_tree_hash(&mut self, hash: MerkleTreeHash) { match self { SplitGraphNodeWeight::Custom(n) => n.set_merkle_tree_hash(hash), SplitGraphNodeWeight::ExternalTarget { merkle_tree_hash, .. } | SplitGraphNodeWeight::Ordering { merkle_tree_hash, .. } | SplitGraphNodeWeight::GraphRoot { merkle_tree_hash, .. } | SplitGraphNodeWeight::SubGraphRoot { merkle_tree_hash, .. } => *merkle_tree_hash = hash, } } pub fn merkle_tree_hash(&self) -> MerkleTreeHash { match self { SplitGraphNodeWeight::Custom(n) => n.merkle_tree_hash(), SplitGraphNodeWeight::ExternalTarget { merkle_tree_hash, .. } | SplitGraphNodeWeight::Ordering { merkle_tree_hash, .. } | SplitGraphNodeWeight::GraphRoot { merkle_tree_hash, .. } | SplitGraphNodeWeight::SubGraphRoot { merkle_tree_hash, .. } => *merkle_tree_hash, } } pub fn node_hash(&self) -> ContentHash { let mut hasher = ContentHash::hasher(); hasher.update(&self.id().inner().to_bytes()); match self { SplitGraphNodeWeight::Custom(c) => { hasher.update(c.node_hash().as_bytes()); } SplitGraphNodeWeight::ExternalTarget { target, .. } => { hasher.update(&target.inner().to_bytes()); } SplitGraphNodeWeight::Ordering { order, .. } => { for id in order { hasher.update(&id.inner().to_bytes()); } } SplitGraphNodeWeight::GraphRoot { .. } | SplitGraphNodeWeight::SubGraphRoot { .. } => {} }; hasher.finalize() } pub fn custom_mut(&mut self) -> Option<&mut N> { match self { SplitGraphNodeWeight::Custom(inner) => Some(inner), _ => None, } } pub fn custom(&self) -> Option<&N> { match self { SplitGraphNodeWeight::Custom(inner) => Some(inner), _ => None, } } pub fn custom_kind(&self) -> Option<N::Kind> { match self { SplitGraphNodeWeight::Custom(inner) => Some(inner.kind()), _ => None, } } pub fn external_target_id(&self) -> Option<SplitGraphNodeId> { match self { SplitGraphNodeWeight::ExternalTarget { target, .. } => Some(*target), _ => None, } } pub fn external_target_custom_kind(&self) -> Option<N::Kind> { match self { SplitGraphNodeWeight::ExternalTarget { target_custom_kind, .. } => *target_custom_kind, _ => None, } } } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum SplitGraphEdgeWeight<E, K, N> where E: CustomEdgeWeight<K>, K: EdgeKind, N: CustomNodeWeight, { Custom(E), ExternalSource { source_id: SplitGraphNodeId, is_default: bool, edge_kind: K, source_node_kind: Option<N::Kind>, #[serde(skip)] phantom_n: PhantomData<N>, }, Ordering, Ordinal, } #[derive(PartialEq, Eq, Copy, Clone, Debug, Serialize, Deserialize, Hash)] pub enum SplitGraphEdgeWeightKind<K> where K: EdgeKind, { Custom(K), ExternalSource, Ordering, Ordinal, } impl<E, K, N> From<SplitGraphEdgeWeight<E, K, N>> for SplitGraphEdgeWeightKind<K> where E: CustomEdgeWeight<K>, N: CustomNodeWeight, K: EdgeKind, { fn from(value: SplitGraphEdgeWeight<E, K, N>) -> Self { match value { SplitGraphEdgeWeight::Custom(c) => SplitGraphEdgeWeightKind::Custom(c.kind()), SplitGraphEdgeWeight::ExternalSource { .. } => SplitGraphEdgeWeightKind::ExternalSource, SplitGraphEdgeWeight::Ordering => SplitGraphEdgeWeightKind::Ordering, SplitGraphEdgeWeight::Ordinal => SplitGraphEdgeWeightKind::Ordinal, } } } impl<E, K, N> From<&SplitGraphEdgeWeight<E, K, N>> for SplitGraphEdgeWeightKind<K> where E: CustomEdgeWeight<K>, N: CustomNodeWeight, K: EdgeKind, { fn from(value: &SplitGraphEdgeWeight<E, K, N>) -> Self { match value { SplitGraphEdgeWeight::Custom(c) => SplitGraphEdgeWeightKind::Custom(c.kind()), SplitGraphEdgeWeight::ExternalSource { .. } => SplitGraphEdgeWeightKind::ExternalSource, SplitGraphEdgeWeight::Ordering => SplitGraphEdgeWeightKind::Ordering, SplitGraphEdgeWeight::Ordinal => SplitGraphEdgeWeightKind::Ordinal, } } } impl<E, K, N> SplitGraphEdgeWeight<E, K, N> where E: CustomEdgeWeight<K>, N: CustomNodeWeight, K: EdgeKind, { pub fn custom(&self) -> Option<&E> { match self { SplitGraphEdgeWeight::Custom(weight) => Some(weight), _ => None, } } pub fn custom_kind(&self) -> Option<K> { match self { SplitGraphEdgeWeight::Custom(weight) => Some(weight.kind()), _ => None, } } pub fn is_default(&self) -> bool { match self { SplitGraphEdgeWeight::Custom(c) => c.is_default(), SplitGraphEdgeWeight::ExternalSource { is_default, .. } => *is_default, SplitGraphEdgeWeight::Ordering => false, SplitGraphEdgeWeight::Ordinal => false, } } pub fn clone_as_non_default(&self) -> Self { match self { SplitGraphEdgeWeight::Custom(c) => { SplitGraphEdgeWeight::Custom(c.clone_as_non_default()) } SplitGraphEdgeWeight::ExternalSource { source_id, edge_kind, source_node_kind, .. } => SplitGraphEdgeWeight::ExternalSource { source_id: *source_id, is_default: false, edge_kind: *edge_kind, source_node_kind: *source_node_kind, phantom_n: PhantomData, }, SplitGraphEdgeWeight::Ordering => SplitGraphEdgeWeight::Ordering, SplitGraphEdgeWeight::Ordinal => SplitGraphEdgeWeight::Ordinal, } } pub fn external_source_data(&self) -> Option<ExternalSourceData<K, N>> { match self { SplitGraphEdgeWeight::ExternalSource { source_id, edge_kind, source_node_kind, .. } => Some(ExternalSourceData { source_id: *source_id, edge_kind: *edge_kind, source_node_kind: *source_node_kind, phantom_n: PhantomData, }), SplitGraphEdgeWeight::Custom(_) | SplitGraphEdgeWeight::Ordering | SplitGraphEdgeWeight::Ordinal => None, } } pub fn set_external_source_id(&mut self, external_source_id: SplitGraphNodeId) { if let SplitGraphEdgeWeight::ExternalSource { source_id, .. } = self { *source_id = external_source_id; } } pub fn edge_entropy(&self) -> Option<Vec<u8>> { match self { SplitGraphEdgeWeight::Custom(c) => c.edge_entropy(), SplitGraphEdgeWeight::ExternalSource { source_id, edge_kind: _, .. } => { let mut entropy = vec![]; entropy.extend_from_slice(&source_id.inner().to_bytes()); // entropy must include edge kind! Some(entropy) } SplitGraphEdgeWeight::Ordering | SplitGraphEdgeWeight::Ordinal => None, } } } pub trait EdgeKind: std::hash::Hash + PartialEq + Eq + Copy + Clone + std::fmt::Debug {} pub trait NodeKind: std::hash::Hash + PartialEq + Eq + Copy + Clone + std::fmt::Debug + Serialize + DeserializeOwned { } pub trait CustomNodeWeight: PartialEq + Eq + Clone + std::fmt::Debug + std::hash::Hash { type Kind: NodeKind; fn kind(&self) -> Self::Kind; fn id(&self) -> SplitGraphNodeId; fn set_id(&mut self, id: SplitGraphNodeId); fn lineage_id(&self) -> SplitGraphNodeId; fn set_lineage_id(&mut self, id: SplitGraphNodeId); fn entity_kind(&self) -> EntityKind; fn set_merkle_tree_hash(&mut self, hash: MerkleTreeHash); fn merkle_tree_hash(&self) -> MerkleTreeHash; fn node_hash(&self) -> ContentHash; fn dot_details(&self) -> String; } pub trait CustomEdgeWeight<K>: std::hash::Hash + PartialEq + Eq + Clone + std::fmt::Debug where K: EdgeKind, { fn kind(&self) -> K; fn edge_entropy(&self) -> Option<Vec<u8>>; // Default edges have a rule that there can be only *one* default edge of a certain kind // outgoing from a node. This rule will be enforced when updates are performed. fn is_default(&self) -> bool; fn clone_as_non_default(&self) -> Self; } #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct SplitGraphNodeIndex { subgraph: SubGraphIndex, index: SubGraphNodeIndex, } impl SplitGraphNodeIndex { pub fn new(subgraph: SubGraphIndex, index: SubGraphNodeIndex) -> Self { Self { subgraph, index } } pub fn subgraph(&self) -> SubGraphIndex { self.subgraph } } #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct SplitGraphEdgeIndex { subgraph: SubGraphIndex, index: SubGraphEdgeIndex, } impl SplitGraphEdgeIndex { pub fn new(subgraph: SubGraphIndex, index: SubGraphEdgeIndex) -> Self { Self { subgraph, index } } } pub type ExternalSourceMap = BTreeMap<SplitGraphNodeId, Vec<SplitGraphEdgeIndex>>; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct SuperGraph { addresses: Vec<SubGraphAddress>, root_index: SplitGraphNodeIndex, external_source_map: ExternalSourceMap, split_max: usize, } impl SuperGraph { pub fn new( split_max: usize, root_index: SplitGraphNodeIndex, external_source_map: ExternalSourceMap, ) -> Self { Self { addresses: vec![], root_index, split_max, external_source_map, } } pub fn split_max(&self) -> usize { self.split_max } pub fn root_index(&self) -> SplitGraphNodeIndex { self.root_index } pub fn add_subgraph_address(&mut self, subgraph_address: SubGraphAddress) { self.addresses.push(subgraph_address); } pub fn addresses(&self) -> &[SubGraphAddress] { self.addresses.as_slice() } pub fn address_for_subgraph(&self, index: usize) -> Option<SubGraphAddress> { self.addresses.get(index).copied() } pub fn external_source_map(&self) -> &ExternalSourceMap { &self.external_source_map } } #[derive(Clone, Debug)] pub struct SplitGraphEdgeIndexes { edge_index: Option<SplitGraphEdgeIndex>, ordinal_edge_index: Option<SplitGraphEdgeIndex>, external_source_edge_index: Option<SplitGraphEdgeIndex>, } #[derive(Clone, Debug)] pub struct SplitGraph<N, E, K> where N: CustomNodeWeight, E: CustomEdgeWeight<K>, K: EdgeKind, { supergraph: SuperGraph, subgraphs: Vec<SubGraph<N, E, K>>, id_to_split_graph_index: DashMap<SplitGraphNodeId, SplitGraphNodeIndex>, } impl<N, E, K> SplitGraph<N, E, K> where N: CustomNodeWeight, K: EdgeKind, E: CustomEdgeWeight<K>, { pub fn new(split_max: usize) -> Self { let mut first_subgraph = SubGraph::new(); let root_id = Ulid::new(); let root_index = first_subgraph .graph .add_node(SplitGraphNodeWeight::GraphRoot { id: root_id, merkle_tree_hash: MerkleTreeHash::nil(), }); first_subgraph.node_index_by_id.insert(root_id, root_index); first_subgraph.root_index = root_index; Self { supergraph: SuperGraph { addresses: vec![], root_index: SplitGraphNodeIndex { index: root_index, subgraph: 0, }, split_max, external_source_map: ExternalSourceMap::new(), }, subgraphs: vec![first_subgraph], id_to_split_graph_index: DashMap::new(), } } pub fn remove_external_source_edges(&mut self, id: SplitGraphNodeId) { let mut edge_indexes = vec![]; for (subgraph_idx, subgraph) in self.subgraphs().iter().enumerate() { edge_indexes.extend( subgraph .graph .edges_directed(subgraph.root_index, Outgoing) .filter(|edge_ref| { edge_ref .weight() .external_source_data() .is_some_and(|ext_source| ext_source.source_id() == id) }) .map(|edge_ref| (subgraph_idx, edge_ref.id())), ); } for (subgraph_idx, edge_index) in edge_indexes { self.remove_edge_by_index(SplitGraphEdgeIndex::new(subgraph_idx, edge_index)); } } pub fn brute_search_external_source_edges( &self, id: SplitGraphNodeId, ) -> Vec<SplitGraphEdgeWeight<E, K, N>> { self.subgraphs() .iter() .flat_map(|subgraph| { subgraph .graph .edges_directed(subgraph.root_index, Outgoing) .filter(|edge_ref| { edge_ref .weight() .external_source_data() .is_some_and(|ext_source| ext_source.source_id() == id) }) }) .map(|edge_ref| edge_ref.weight().clone()) .collect() } pub fn external_edge_map_for_id( &self, id: SplitGraphNodeId, ) -> Option<Vec<SplitGraphEdgeIndex>> { self.supergraph.external_source_map().get(&id).cloned() } pub fn from_parts(supergraph: SuperGraph, subgraphs: Vec<SubGraph<N, E, K>>) -> Self { Self { supergraph, subgraphs, id_to_split_graph_index: DashMap::new(), } } pub fn node_count(&self) -> usize { self.subgraphs.iter().fold(0, |count, subgraph| { count.saturating_add(subgraph.node_index_by_id.len()) }) } pub fn root_id(&self) -> SplitGraphResult<SplitGraphNodeId> { self.node_weight_by_index(self.supergraph.root_index) .map(|node| node.id()) .ok_or(SplitGraphError::RootNodeNotFound) } pub fn subgraph_count(&self) -> usize { self.subgraphs.len() } pub fn recalculate_merkle_tree_hashes_based_on_touched_nodes(&mut self) { self.subgraphs .iter_mut() .enumerate() .for_each(|(idx, subgraph)| { let (nodes, edges) = subgraph.recalculate_merkle_tree_hash_based_on_touched_nodes(); if idx == 0 { warn!("nodes: {}, edges: {}", nodes, edges); } }); } pub fn recalculate_entire_merkle_tree_hashes(&mut self) { self.subgraphs .iter_mut() .for_each(|subgraph| subgraph.recalculate_entire_merkle_tree_hash()); } pub fn make_node_id(&mut self) -> SplitGraphNodeId { Ulid::new() } fn new_subgraph(&mut self) -> usize { self.supergraph.addresses.push(SubGraphAddress::nil()); let subgraph = SubGraph::new_with_root(); let subgraph_index = self.subgraphs.len(); self.subgraphs.push(subgraph); subgraph_index } fn new_empty_subgraph(&mut self) -> usize { self.supergraph.addresses.push(SubGraphAddress::nil()); let subgraph = SubGraph::new(); let subgraph_index = self.subgraphs.len(); self.subgraphs.push(subgraph); subgraph_index } pub fn supergraph(&self) -> &SuperGraph { &self.supergraph } pub fn subgraphs(&self) -> &[SubGraph<N, E, K>] { self.subgraphs.as_slice() } pub fn subgraph_for_node(&self, node_id: SplitGraphNodeId) -> Option<&SubGraph<N, E, K>> { self.subgraphs .iter() .find(|subgraph| subgraph.node_id_to_index(node_id).is_some()) } pub fn subgraph_root_id_for_node(&self, node_id: SplitGraphNodeId) -> Option<SplitGraphNodeId> { self.subgraph_for_node(node_id) .and_then(|subgraph| subgraph.root_id()) } pub fn subgraph_mut_for_node( &mut self, node_id: SplitGraphNodeId, ) -> Option<&mut SubGraph<N, E, K>> { self.subgraphs .iter_mut() .find(|subgraph| subgraph.node_id_to_index(node_id).is_some()) } fn get_subgraph(&self, subgraph_index: usize) -> SplitGraphResult<&SubGraph<N, E, K>> { self.subgraphs .get(subgraph_index) .ok_or(SplitGraphError::SubGraphMissing(subgraph_index)) } fn get_subgraph_mut( &mut self, subgraph_index: usize, ) -> SplitGraphResult<&mut SubGraph<N, E, K>> { self.subgraphs .get_mut(subgraph_index) .ok_or(SplitGraphError::SubGraphMissing(subgraph_index)) } fn add_node_to_subgraph( &mut self, subgraph_index: SubGraphIndex, node: SplitGraphNodeWeight<N>, ) -> SplitGraphResult<SplitGraphNodeIndex> { let subgraph = self.get_subgraph_mut(subgraph_index)?; let node_index = subgraph.add_node(node); Ok(SplitGraphNodeIndex::new(subgraph_index, node_index)) } pub fn add_or_replace_node(&mut self, node: N) -> SplitGraphResult<SplitGraphNodeIndex> { let node_id = node.id(); if let Some(split_graph_index) = self.node_id_to_index(node_id) { let subgraph = self.get_subgraph_mut(split_graph_index.subgraph)?; subgraph.replace_node(split_graph_index.index, SplitGraphNodeWeight::Custom(node)); return Ok(split_graph_index); } let subgraph_index = if let Some((index, _)) = self.subgraphs.iter().enumerate().find(|(_, sub)| { // We add one to the max so that the root node is not part of the count sub.node_index_by_id.len() < (self.supergraph.split_max + 1) }) { index } else { self.new_subgraph() }; let index = self.add_node_to_subgraph(subgraph_index, SplitGraphNodeWeight::Custom(node))?; self.id_to_split_graph_index.insert(node_id, index); Ok(index) } pub fn add_ordered_node(&mut self, node: N) -> SplitGraphResult<SplitGraphNodeIndex> { let split_graph_index = self.add_or_replace_node(node)?; let subgraph = self.get_subgraph_mut(split_graph_index.subgraph)?; subgraph.add_or_get_ordering_node_for_node_index(split_graph_index.index); Ok(split_graph_index) } pub fn add_ordering_node_for_node_id( &mut self, node_id: SplitGraphNodeId, ) -> SplitGraphResult<()> { if let Some(split_graph_index) = self.node_id_to_index(node_id) { let subgraph = self.get_subgraph_mut(split_graph_index.subgraph)?; subgraph.add_or_get_ordering_node_for_node_index(split_graph_index.index); } Ok(()) } fn node_weight_by_index(&self, index: SplitGraphNodeIndex) -> Option<&SplitGraphNodeWeight<N>> { self.subgraphs .get(index.subgraph) .and_then(|sub| sub.graph.node_weight(index.index)) } pub fn subgraph_index_for_node(&self, node_id: SplitGraphNodeId) -> Option<usize> { let index = self.node_id_to_index(node_id); index.map(|index| index.subgraph) } pub fn subgraph_root_id(&self, subgraph_index: usize) -> Option<SplitGraphNodeId> { self.subgraphs .get(subgraph_index) .and_then(|sub| sub.root_id()) } pub fn raw_node_weight(&self, node_id: SplitGraphNodeId) -> Option<&SplitGraphNodeWeight<N>> { self.node_id_to_index(node_id).and_then(|index| { self.subgraphs .get(index.subgraph) .and_then(|subgraph| subgraph.graph.node_weight(index.index)) }) } pub fn node_weight(&self, node_id: SplitGraphNodeId) -> Option<&N> { self.raw_node_weight(node_id).and_then(|node| node.custom()) } pub fn node_exists(&self, node_id: SplitGraphNodeId) -> bool { self.node_weight(node_id).is_some() } pub fn raw_node_weight_mut( &mut self, node_id: SplitGraphNodeId, ) -> Option<&mut SplitGraphNodeWeight<N>> { self.node_id_to_index(node_id).and_then(|index| { self.subgraphs .get_mut(index.subgraph) .and_then(|subgraph| subgraph.graph.node_weight_mut(index.index)) }) } pub fn node_weight_mut(&mut self, node_id: SplitGraphNodeId) -> Option<&mut N> { self.raw_node_weight_mut(node_id) .and_then(|weight| weight.custom_mut()) } /// In order to rename a node, we need to update any cross-graph references /// to it. First, we look to see if the node has any external source edges. /// If it does, we need to find the edge in the other graph that matches the /// external source edge and update the ExternalTarget node. /// /// Then, we need to find any external targets outgoing from the node. /// Then, lookup the external source edge that corresponds to that /// external target and update the source edge to the new node id. fn rewrite_external_ids(&mut self, old_id: SplitGraphNodeId, new_id: SplitGraphNodeId) { let Some(node_index) = self.node_id_to_index(old_id) else { return; }; let external_sources_incoming_to_old_id: Vec<_> = if let Some(node_subgraph) = self.subgraphs.get(node_index.subgraph) { node_subgraph .graph .edges_directed(node_index.index, Incoming) .filter_map(|edge_ref| match edge_ref.weight() { SplitGraphEdgeWeight::ExternalSource { source_id, .. } => Some(*source_id), SplitGraphEdgeWeight::Custom(_) | SplitGraphEdgeWeight::Ordering | SplitGraphEdgeWeight::Ordinal => None, }) .collect() } else { vec![] }; for external_source in external_sources_incoming_to_old_id { let Some(external_source_index) = self.node_id_to_index(external_source) else { continue; }; let Some(external_source_subgraph) = self.subgraphs.get_mut(external_source_index.subgraph) else { continue; }; external_source_subgraph.update_external_target_ids( external_source_index.index, old_id, new_id, ); } if let Some(this_node_as_external_source_edges) = self.supergraph.external_source_map.remove(&old_id) { let mut new_external_source_edges = vec![]; for external_source_edge_index in this_node_as_external_source_edges { let Some(subgraph_mut) = self.subgraphs.get_mut(external_source_edge_index.subgraph) else { continue; }; let Some(mut edge_weight) = subgraph_mut .graph .edge_weight(external_source_edge_index.index) .cloned() else { continue; }; let Some((from_index, to_index)) = subgraph_mut .graph .edge_endpoints(external_source_edge_index.index) else { continue; }; edge_weight.set_external_source_id(new_id); let edge_index = subgraph_mut .graph .add_edge(from_index, to_index, edge_weight); new_external_source_edges.push(SplitGraphEdgeIndex { subgraph: external_source_edge_index.subgraph, index: edge_index, }); subgraph_mut .graph .remove_edge(external_source_edge_index.index); subgraph_mut.touch_node(from_index); } self.supergraph .external_source_map .insert(new_id, new_external_source_edges); } } pub fn update_node_id( &mut self, current_node_id: SplitGraphNodeId, new_id: SplitGraphNodeId, new_lineage_id: SplitGraphNodeId, ) -> SplitGraphResult<()> { let index = self .node_id_to_index(current_node_id) .ok_or(SplitGraphError::NodeNotFound(current_node_id))?; self.rewrite_external_ids(current_node_id, new_id); let node_weight_mut = self .raw_node_weight_mut(current_node_id) .ok_or(SplitGraphError::NodeNotFound(current_node_id))?; let current_lineage_id = node_weight_mut.lineage_id(); node_weight_mut.set_id(new_id); node_weight_mut.set_lineage_id(new_lineage_id); if let Some(subgraph) = self.subgraphs.get_mut(index.subgraph) { subgraph.remove_ids_from_indexes(current_node_id, current_lineage_id); subgraph.add_ids_to_indexes(new_id, new_lineage_id, index.index); subgraph.touch_node(index.index); } self.id_to_split_graph_index.insert(new_id, index); self.id_to_split_graph_index.remove(&current_node_id); Ok(()) } pub fn touch_node(&mut self, node_id: SplitGraphNodeId) { let Some(index) = self.node_id_to_index(node_id) else { return; }; let Some(subgraph) = self.subgraphs.get_mut(index.subgraph) else { return; }; subgraph.touch_node(index.index); } #[inline] pub fn node_id_to_index(&self, id: SplitGraphNodeId) -> Option<SplitGraphNodeIndex> { match self .id_to_split_graph_index .get(&id) .map(|entry_ref| *entry_ref.value()) { Some(index) => Some(index), None => { let index = self .subgraphs .iter() .enumerate() .find(|(_, sub)| sub.node_index_by_id.contains_key(&id)) .and_then(|(idx, sub)| { sub.node_index_by_id .get(&id) .map(|subgraph_index| SplitGraphNodeIndex::new(idx, *subgraph_index)) }); if let Some(index) = index { self.id_to_split_graph_index.insert(id, index); } index } } } // pub fn raw_outgoing_edges_from_subgraph_root( // &self, // subgraph: usize, // ) -> Option<Vec<(SplitGraphEdgeWeight<E, K>, SplitGraphNodeWeight<N>)>> { // let subgraph = self.subgraphs().get(subgraph)?; // let root = subgraph.root_index; // Some( // subgraph // .graph // .edges_directed(root, Outgoing) // .filter_map(|edge_ref| { // subgraph // .graph // .node_weight(edge_ref.target()) // .map(|node| (edge_ref.weight().clone(), node.clone())) // }) // .collect(), // ) // } // pub fn raw_outgoing_edges( // &self, // node_id: SplitGraphNodeId, // ) -> Option<Vec<(SplitGraphEdgeWeight<E, K>, SplitGraphNodeWeight<N>)>> { // let index = self.node_id_to_index(node_id)?; // let subgraph = self.subgraphs().get(index.subgraph)?; // Some( // subgraph // .graph // .edges_directed(index.index, Outgoing) // .filter_map(|edge_ref| { // subgraph // .graph // .node_weight(edge_ref.target()) // .map(|node| (edge_ref.weight().clone(), node.clone())) // }) // .collect(), // ) // } // pub fn raw_incoming_edges( // &self, // node_id: SplitGraphNodeId, // ) -> Option<Vec<SplitGraphEdgeWeight<E, K>>> { // let index = self.node_id_to_index(node_id)?; // let subgraph = self.subgraphs().get(index.subgraph)?; // Some( // subgraph // .graph // .edges_directed(index.index, Incoming) // .map(|edge_ref| edge_ref.weight().clone()) // .collect(), // ) // } pub fn remove_node(&mut self, node_id: SplitGraphNodeId) -> SplitGraphResult<()> { if self.node_id_to_index(node_id).is_none() { return Ok(()); } // Although removing a node is enough to remove its edges from a single subgraph, // we have to call remove_edge here for all incoming and outgoing edges in order // to ensure we remove cross graph edges (from external sources and to external targets) let incoming_sources: Vec<_> = self .edges_directed(node_id, Incoming)? .map(|edge_ref| (edge_ref.source(), edge_ref.weight().kind())) .collect(); let outgoing_targets: Vec<_> = self .edges_directed(node_id, Outgoing)? .map(|edge_ref| (edge_ref.target(), edge_ref.weight().kind())) .collect(); for (incoming_source, kind) in incoming_sources { self.remove_edge(incoming_source, kind, node_id)?; } for (outgoing_target, kind) in outgoing_targets { self.remove_edge(node_id, kind, outgoing_target)?; } let node_index = self .node_id_to_index(node_id) .ok_or(SplitGraphError::NodeNotFound(node_id))?; let subgraph_idx = node_index.subgraph; let subgraph = self.get_subgraph_mut(subgraph_idx)?; subgraph.remove_node(node_index.index); self.id_to_split_graph_index.remove(&node_id); Ok(()) } pub fn find_edge( &self, from_id: SplitGraphNodeId, to_id: SplitGraphNodeId, edge_weight_kind: K, ) -> Option<&E> { let from_index = self.node_id_to_index(from_id)?; let from_subgraph_idx = from_index.subgraph; let subgraph = self.subgraphs.get(from_subgraph_idx)?; subgraph .graph .edges_directed(from_index.index, Outgoing) .find(|edge_ref| { if Some(edge_weight_kind) == edge_ref.weight().custom().map(|edge| edge.kind()) { match subgraph.graph.node_weight(edge_ref.target()) { Some(node) => match node { SplitGraphNodeWeight::Custom(c) => c.id() == to_id, SplitGraphNodeWeight::ExternalTarget { target, .. } => *target == to_id, _ => false, }, None => false, } } else { false } }) .and_then(|edge_ref| edge_ref.weight().custom()) } pub fn remove_edge( &mut self, from_id: SplitGraphNodeId, edge_kind: K, to_id: SplitGraphNodeId, ) -> SplitGraphResult<()> { let from_index = self .node_id_to_index(from_id) .ok_or(SplitGraphError::NodeNotFound(from_id))?; let to_index = self .node_id_to_index(to_id) .ok_or(SplitGraphError::NodeNotFound(to_id))?; self.touch_node(from_id); let from_subgraph_idx = from_index.subgraph; let to_subgraph_idx = to_index.subgraph; if from_subgraph_idx == to_subgraph_idx { let from_subgraph = self.get_subgraph_mut(from_subgraph_idx)?; if let Some(edge_idx) = from_subgraph .graph .edges_directed(from_index.index, Outgoing) .find(|edge_ref| { edge_ref .weight() .custom() .map(|edge| edge.kind() == edge_kind) .unwrap_or(false) && from_subgraph .graph .node_weight(edge_ref.target()) .map(|node| node.id() == to_id) .unwrap_or(false) }) .map(|edge_ref| edge_ref.id()) { from_subgraph.remove_edge_by_index(edge_idx); } } else { let from_subgraph = self.get_subgraph_mut(from_subgraph_idx)?; if let Some(edge_idx) = from_subgraph .graph .edges_directed(from_index.index, Outgoing) .find(|edge_ref| { edge_ref .weight() .custom() .map(|edge| edge.kind() == edge_kind) .unwrap_or(false) && from_subgraph .graph .node_weight(edge_ref.target()) .map(|node| match node { SplitGraphNodeWeight::ExternalTarget { target, .. } => { *target == to_id } _ => false, }) .unwrap_or(false) }) .map(|edge_ref| edge_ref.id()) { from_subgraph.remove_edge_by_index(edge_idx); let to_subgraph = self.get_subgraph_mut(to_subgraph_idx)?; if let Some(edge_idx) = to_subgraph .graph .edges_directed(to_index.index, Incoming) .find(|edge_ref| match edge_ref.weight() { SplitGraphEdgeWeight::ExternalSource { source_id, edge_kind: ek, .. } => *source_id == from_id && *ek == edge_kind, _ => false, }) .map(|edge_ref| edge_ref.id()) { to_subgraph.remove_edge_by_index(edge_idx); self.remove_external_source_edge_mapping( from_id, SplitGraphEdgeIndex { subgraph: to_subgraph_idx, index: edge_idx, }, ); } } } Ok(()) } pub fn add_ordered_edge( &mut self, from_id: SplitGraphNodeId, edge: E, to_id: SplitGraphNodeId, ) -> SplitGraphResult<SplitGraphEdgeIndexes> { self.add_edge_inner(from_id, edge, to_id, true) } pub fn add_edge( &mut self, from_id: SplitGraphNodeId, edge: E, to_id: SplitGraphNodeId, ) -> SplitGraphResult<SplitGraphEdgeIndexes> { self.add_edge_inner(from_id, edge, to_id, false) } pub fn remove_edges_by_indexes(&mut self, indexes: SplitGraphEdgeIndexes) { for edge_index in [ indexes.edge_index, indexes.ordinal_edge_index, indexes.external_source_edge_index, ] .into_iter() .flatten() { self.remove_edge_by_index(edge_index); } } pub fn remove_edge_by_index(&mut self, index: SplitGraphEdgeIndex) { if let Some(subgraph) = self.subgraphs.get_mut(index.subgraph) { subgraph.remove_edge_by_index(index.index); } } pub fn add_edge_with_cycle_check( &mut self, from_id: SplitGraphNodeId, edge: E, to_id: SplitGraphNodeId, ) -> SplitGraphResult<SplitGraphEdgeIndexes> { let indexes = self.add_edge_inner(from_id, edge, to_id, false)?; if !self.is_acyclic_directed() { self.remove_edges_by_indexes(indexes); Err(SplitGraphError::WouldCreateGraphCycle) } else { Ok(indexes) } } fn add_edge_inner( &mut self, from_id: SplitGraphNodeId, edge: E, to_id: SplitGraphNodeId, ordered: bool, ) -> SplitGraphResult<SplitGraphEdgeIndexes> { let from_index = self .node_id_to_index(from_id) .ok_or(SplitGraphError::NodeNotFound(from_id))?; let to_index = self .node_id_to_index(to_id) .ok_or(SplitGraphError::NodeNotFound(to_id))?; let custom_edge_weight = SplitGraphEdgeWeight::Custom(edge.clone()); let subgraph_edge_index; let mut subgraph_ordinal_edge_index = None; let mut subgraph_external_source_edge_index = None; let from_subgraph_idx = from_index.subgraph; let to_subgraph_idx = to_index.subgraph; if from_subgraph_idx == to_subgraph_idx { let from_subgraph = self.get_subgraph_mut(from_subgraph_idx)?; if ordered { (subgraph_edge_index, subgraph_ordinal_edge_index) = from_subgraph .add_ordered_edge(from_index.index, custom_edge_weight, to_index.index)?; } else { subgraph_edge_index = from_subgraph.add_edge(from_index.index, custom_edge_weight, to_index.index); } } else { let (kind, custom_kind) = { let to_subgraph = self.get_subgraph(to_subgraph_idx)?; let to_node = to_subgraph .node_weight(to_id) .ok_or(SplitGraphError::NodeNotFound(to_id))?; (to_node.into(), to_node.custom().map(|node| node.kind())) }; let ext_target_id = SplitGraphNodeId::new(); let ext_target_idx = self.add_node_to_subgraph( from_subgraph_idx, SplitGraphNodeWeight::ExternalTarget { id: ext_target_id, target: to_id, merkle_tree_hash: MerkleTreeHash::nil(), target_kind: kind, target_custom_kind: custom_kind, }, )?; let from_subgraph = self.get_subgraph_mut(from_subgraph_idx)?; let from_node_custom_kind = from_subgraph .node_weight(from_id) .ok_or(SplitGraphError::NodeNotFound(from_id))? .custom_kind(); if ordered { (subgraph_edge_index, subgraph_ordinal_edge_index) = from_subgraph .add_ordered_edge(from_index.index, custom_edge_weight, ext_target_idx.index)?; } else { subgraph_edge_index = from_subgraph.add_edge( from_index.index, custom_edge_weight, ext_target_idx.index, ); } let to_subgraph = self.get_subgraph_mut(to_subgraph_idx)?; subgraph_external_source_edge_index = to_subgraph.add_edge( to_subgraph.root_index, SplitGraphEdgeWeight::ExternalSource { source_id: from_id, is_default: edge.is_default(), edge_kind: edge.kind(), source_node_kind: from_node_custom_kind, phantom_n: PhantomData, }, to_index.index, ); if let Some(external_source_edge_index) = &subgraph_external_source_edge_index { let source_edge_index = SplitGraphEdgeIndex { subgraph: to_subgraph_idx, index: *external_source_edge_index, }; self.supergraph .external_source_map .entry(from_id) .and_modify(|external_source_edges| { external_source_edges.push(source_edge_index) }) .or_insert(vec![source_edge_index]); } } Ok(SplitGraphEdgeIndexes { edge_index: subgraph_edge_index.map(|index| SplitGraphEdgeIndex { subgraph: from_subgraph_idx, index, }), ordinal_edge_index: subgraph_ordinal_edge_index.map(|index| SplitGraphEdgeIndex { subgraph: from_subgraph_idx, index, }), external_source_edge_index: subgraph_external_source_edge_index.map(|index| { SplitGraphEdgeIndex { subgraph: to_subgraph_idx, index, } }), }) } pub fn reorder_node<L>(&mut self, node_id: SplitGraphNodeId, lambda: L) -> SplitGraphResult<()> where L: FnOnce(&[SplitGraphNodeId]) -> Vec<SplitGraphNodeId>, { let split_graph_index = self .node_id_to_index(node_id) .ok_or(SplitGraphError::NodeNotFound(node_id))?; let subgraph = self.get_subgraph_mut(split_graph_index.subgraph)?; subgraph.reorder_node(split_graph_index.index, lambda) } pub fn ordered_children(&self, node_id: SplitGraphNodeId) -> Option<Vec<SplitGraphNodeId>> { let split_graph_index = self.node_id_to_index(node_id)?; let subgraph = self.subgraphs.get(split_graph_index.subgraph)?; subgraph .ordered_children(split_graph_index.index) .map(|node_indexes| { node_indexes .into_iter() .filter_map(|idx| { subgraph.graph.node_weight(idx).map(|n| match n { SplitGraphNodeWeight::ExternalTarget { target, .. } => *target, other => other.id(), }) }) .collect() }) } /// Find all ancestors of `node_id` across all subgraphs (includes non-custom nodes). /// Cycle-safe, and non-recursive. pub fn all_parents_of( &self, node_id: SplitGraphNodeId, ) -> SplitGraphResult<Vec<SplitGraphNodeId>> { let mut result = vec![]; let mut seen_list = HashSet::new(); let mut work_queue = VecDeque::from([node_id]); while let Some(node_id) = work_queue.pop_front() { if seen_list.contains(&node_id) { continue; } seen_list.insert(node_id); result.push(node_id); work_queue.extend( self.edges_directed(node_id, Incoming)? .map(|edge_ref| edge_ref.source()), ); } Ok(result) } /// Find the outgoing or incoming neighbor of `from_id` which is connected to /// `from_id` via an edge with kind `kind`. If there is no such neighbor, /// returns `None`. But if there is more than one such neighbor, an error /// is returned. pub fn directed_unique_neighbor_of_edge_weight_kind( &self, from_id: SplitGraphNodeId, direction: Direction, kind: K, ) -> SplitGraphResult<Option<Ulid>> { let mut edges_of_kind = self.edges_directed_for_edge_weight_kind(from_id, direction, kind)?; let Some(edge_ref) = edges_of_kind.next() else { return Ok(None); }; if edges_of_kind.next().is_some() { return Err(SplitGraphError::TooManyEdgesOfKind( from_id, direction, format!("{kind:?}"), )); } Ok(Some(match direction { Outgoing => edge_ref.target(), Incoming => edge_ref.source(), })) } pub fn incoming_edges( &self, to_id: SplitGraphNodeId, kind: K, ) -> SplitGraphResult<impl Iterator<Item = SplitGraphEdgeReference<'_, E, K>> + '_> { self.edges_directed_for_edge_weight_kind(to_id, Incoming, kind) } pub fn incoming_sources( &self, to_id: SplitGraphNodeId, kind: K, ) -> SplitGraphResult<impl Iterator<Item = SplitGraphNodeId> + '_> { Ok(self .incoming_edges(to_id, kind)? .map(|edge_ref| edge_ref.source())) } pub fn outgoing_edges( &self, from_id: SplitGraphNodeId, kind: K, ) -> SplitGraphResult<impl Iterator<Item = SplitGraphEdgeReference<'_, E, K>> + '_> { self.edges_directed_for_edge_weight_kind(from_id, Outgoing, kind) } pub fn outgoing_targets( &self, from_id: SplitGraphNodeId, kind: K, ) -> SplitGraphResult<impl Iterator<Item = SplitGraphNodeId> + '_> { Ok(self .outgoing_edges(from_id, kind)? .map(|edge_ref| edge_ref.target())) } pub fn edges_directed_for_edge_weight_kind( &self, from_id: SplitGraphNodeId, direction: Direction, kind: K, ) -> SplitGraphResult<impl Iterator<Item = SplitGraphEdgeReference<'_, E, K>> + '_> { let iter = self .edges_directed(from_id, direction)? .filter(move |edge_ref| edge_ref.weight().kind() == kind); Ok(iter) } /// Produces an iterator akin to the petgraph edges_directed iterator, but supports cross-subgraph edges. /// Note that this iterator does not expose "internal" split graph edges, such as Ordering, Ordinal, /// or ExternalSource edges. Only the "custom" edges produced. pub fn edges_directed( &self, from_id: SplitGraphNodeId, direction: Direction, ) -> SplitGraphResult<SplitGraphEdges<N, E, K>> { let split_graph_index = self .node_id_to_index(from_id) .ok_or(SplitGraphError::NodeNotFound(from_id))?; let subgraph = self.get_subgraph(split_graph_index.subgraph)?; let edges = subgraph .graph .edges_directed(split_graph_index.index, direction); Ok(SplitGraphEdges { this_subgraph: subgraph, subgraphs: self.subgraphs(), edges, from_id, direction, debug: false, }) } pub fn raw_edges_directed( &self, from_id: SplitGraphNodeId, direction: Direction, ) -> SplitGraphResult<RawSplitGraphEdges<'_, N, E, K>> { let index = self .node_id_to_index(from_id) .ok_or(SplitGraphError::NodeNotFound(from_id))?; let subgraph = self.get_subgraph(index.subgraph)?; Ok(RawSplitGraphEdges { this_subgraph: subgraph, edges: subgraph.graph.edges_directed(index.index, direction), }) } pub fn raw_neighbors_directed( &self, from_id: SplitGraphNodeId, direction: Direction, ) -> SplitGraphNeighbors<'_, N, E, K> { let split_graph_index = self.node_id_to_index(from_id); let subgraph = split_graph_index.and_then(|index| self.subgraphs().get(index.subgraph)); SplitGraphNeighbors { direction, subgraph, incoming_edges: subgraph .zip(split_graph_index) .map(|(subgraph, split_graph_index)| { subgraph .graph .edges_directed(split_graph_index.index, Incoming) }), outgoing_neighbors: subgraph.zip(split_graph_index).map( |(subgraph, split_graph_index)| { subgraph .graph .neighbors_directed(split_graph_index.index, Outgoing) }, ), } } pub fn raw_neighbors(&self, from_id: SplitGraphNodeId) -> SplitGraphNeighbors<'_, N, E, K> { let split_graph_index = self.node_id_to_index(from_id); let subgraph = split_graph_index.and_then(|index| self.subgraphs().get(index.subgraph)); SplitGraphNeighbors { direction: Outgoing, subgraph, incoming_edges: subgraph .zip(split_graph_index) .map(|(subgraph, split_graph_index)| { subgraph .graph .edges_directed(split_graph_index.index, Incoming) }), outgoing_neighbors: subgraph.zip(split_graph_index).map( |(subgraph, split_graph_index)| { subgraph .graph .neighbors_directed(split_graph_index.index, Outgoing) }, ), } } pub fn edges_directed_debug( &self, from_id: SplitGraphNodeId, direction: Direction, ) -> SplitGraphResult<SplitGraphEdges<N, E, K>> { let split_graph_index = self .node_id_to_index(from_id) .ok_or(SplitGraphError::NodeNotFound(from_id))?; let subgraph = self.get_subgraph(split_graph_index.subgraph)?; let edges = subgraph .graph .edges_directed(split_graph_index.index, direction); Ok(SplitGraphEdges { this_subgraph: subgraph, subgraphs: self.subgraphs(), edges, from_id, direction, debug: true, }) } fn remove_external_source_edge_mapping( &mut self, from_id: SplitGraphNodeId, edge_index: SplitGraphEdgeIndex, ) { self.supergraph .external_source_map .entry(from_id) .and_modify(|edges| { edges.retain(|existing_edge_index| *existing_edge_index != edge_index) }); } /// Cleanup the split graph by removing nodes with no incoming edges and cleaning up mappings. /// This is more complicated than before because we need to cascade "external" detection across subgraphs. /// So we remove all externals, then remove all external source edges, then look for externals yet again, /// looping until no more edges to delete are found. /// /// This could be made more efficient by turning the external source edge into a B+tree on each subgraph /// (keyed by source id) to avoid brute force searches of the edges pub fn cleanup(&mut self) { let mut removal_set = HashSet::new(); loop { let mut found_removals = false; for subgraph in self.subgraphs.iter_mut() { if !removal_set.is_empty() { let edges_to_delete: Vec<_> = subgraph .graph .edges_directed(subgraph.root_index, Outgoing) .filter(|edge_ref| { edge_ref.weight().external_source_data().is_some_and( |source_data| removal_set.contains(&source_data.source_id()), ) }) .map(|edge_ref| edge_ref.id()) .collect(); for edge_idx in edges_to_delete { subgraph.graph.remove_edge(edge_idx); } } let new_removals = subgraph.remove_externals(); if !found_removals && !new_removals.is_empty() { found_removals = true; } removal_set.extend(new_removals); } if !found_removals { break; } } for subgraph in self.subgraphs.iter_mut() { subgraph.cleanup_maps(); } self.id_to_split_graph_index.clear(); } pub fn cleanup_and_merkle_tree_hash(&mut self) { let start = Instant::now(); self.cleanup(); warn!("cleanup took {:?}", start.elapsed()); let start = Instant::now(); self.recalculate_merkle_tree_hashes_based_on_touched_nodes(); warn!( "recalculate_merkle_tree_hashes_based_on_touched_nodes took {:?}", start.elapsed() ); } /// Calculate the updates that this graph has relative to `base_graph` pub fn detect_updates(&self, updated_graph: &SplitGraph<N, E, K>) -> Vec<Update<N, E, K>> { let mut updates = vec![]; let mut subgraph_iter = OptZip::new(updated_graph.subgraphs.iter(), self.subgraphs.iter()); while let Some((Some(updated_subgraph), maybe_base_subgraph)) = subgraph_iter.next() { let Some(root_node_id) = updated_subgraph.root_id() else { continue; }; match maybe_base_subgraph { Some(base_subgraph) => updates.extend( updates::Detector::new(base_subgraph, updated_subgraph, root_node_id) .detect_updates() .into_iter(), ), None => { updates.push(Update::NewSubGraph { subgraph_root_id: root_node_id, }); updates.extend( updates::subgraph_as_updates(updated_subgraph, root_node_id).into_iter(), ) } } } updates } pub fn detect_changes( &self, updated_graph: &SplitGraph<N, E, K>, ) -> SplitGraphResult<Vec<Change>> { let mut changes = vec![]; let mut subgraph_iter = OptZip::new(updated_graph.subgraphs.iter(), self.subgraphs.iter()); let mut detected_ids = HashSet::new(); while let Some((Some(updated_subgraph), maybe_base_subgraph)) = subgraph_iter.next() { let Some(updated_subgraph_root_id) = updated_subgraph.root_id() else { continue; }; match maybe_base_subgraph { Some(base_subgraph) => { let mut subgraph_changes = updates::Detector::new( base_subgraph, updated_subgraph, updated_subgraph_root_id, ) .detect_changes(); for subgraph_change in &subgraph_changes { detected_ids.insert(subgraph_change.entity_id); } subgraph_changes.extend( base_subgraph .node_index_by_id .keys() .filter(|node_id| { !updated_subgraph.node_index_by_id.contains_key(*node_id) }) .filter_map(|node_id| { match base_subgraph .node_id_to_index(*node_id) .and_then(|index| base_subgraph.graph.node_weight(index)) { Some(SplitGraphNodeWeight::Custom(c)) => Some(Change { entity_id: (*node_id).into(), entity_kind: c.entity_kind(), merkle_tree_hash: c.merkle_tree_hash(), }), _ => None, } }), ); changes.extend(subgraph_changes); } None => { // This entire subgraph is a new set of changes changes.extend(updated_subgraph.graph.node_weights().filter_map( |node_weight| match node_weight { SplitGraphNodeWeight::Custom(_) | SplitGraphNodeWeight::GraphRoot { .. } => { detected_ids.insert(node_weight.id().into()); Some(Change { entity_id: node_weight.id().into(), entity_kind: node_weight.entity_kind(), merkle_tree_hash: node_weight.merkle_tree_hash(), }) } SplitGraphNodeWeight::ExternalTarget { .. } | SplitGraphNodeWeight::Ordering { .. } | SplitGraphNodeWeight::SubGraphRoot { .. } => None, }, )); } } } let mut final_changes = vec![]; // Now that we've detected all the changed nodes in every subgraph, we need to detect all // the parents of these changed nodes, *across* subgraphs, since these will have also // changed. reversed so that parents come before children in the finalized list for change in &changes { // We want to prefer nodes in the updated graph, since those will be the // updated version of these nodes, but when the change is a removal, // we have to switch to the base graph let graph_to_search = if updated_graph .node_id_to_index(change.entity_id.into()) .is_some() { updated_graph } else { self }; for parent_id in graph_to_search .all_parents_of(change.entity_id.into())? .into_iter() .rev() { let graph_to_search = if updated_graph.node_id_to_index(parent_id).is_some() { updated_graph } else { self }; if detected_ids.contains(&parent_id.into()) { continue; } detected_ids.insert(parent_id.into()); if let Some( weight @ SplitGraphNodeWeight::GraphRoot { .. } | weight @ SplitGraphNodeWeight::Custom(_), ) = graph_to_search.raw_node_weight(parent_id) { // If we find this node now, that means its merkle tree hash hasn't changed // since it was in different subgraph than the child node which *did* change. // This just adds a bit of entropy to the changes so that the checksum // generated is different. May not be necessary since there *will* be at least // one other changed node? let mut hasher = MerkleTreeHash::hasher(); hasher.update(change.merkle_tree_hash.as_bytes()); hasher.update(weight.merkle_tree_hash().as_bytes()); final_changes.push(Change { entity_id: parent_id.into(), entity_kind: weight.entity_kind(), merkle_tree_hash: hasher.finalize(), }); } } } final_changes.extend(changes); Ok(final_changes) } pub fn is_acyclic_directed(&self) -> bool { petgraph::algo::toposort(self, None).is_ok() } pub fn perform_updates(&mut self, updates: &[Update<N, E, K>]) { let mut subgraph_id_to_index = BTreeMap::new(); for (subgraph_idx, subgraph) in self.subgraphs.iter().enumerate() { if let Some(root_id) = subgraph.root_id() { subgraph_id_to_index.insert(root_id, subgraph_idx); } } for update in updates { match update { Update::NewEdge { subgraph_root_id, source, destination, edge_weight, } => { let Some(subgraph_index) = subgraph_id_to_index.get(subgraph_root_id) else { continue; }; let Some(subgraph) = self.subgraphs.get_mut(*subgraph_index) else { continue; }; let Some((from_index, to_index)) = subgraph .node_id_to_index(source.id) .zip(subgraph.node_id_to_index(destination.id)) else { continue; }; if edge_weight.is_default() { ensure_only_one_default_edge( subgraph, from_index, to_index, edge_weight.clone(), ); } subgraph.add_edge_raw(from_index, edge_weight.clone(), to_index); } Update::RemoveEdge { subgraph_root_id, source, destination, edge_kind, external_source_data, } => { let Some(subgraph_index) = subgraph_id_to_index.get(subgraph_root_id) else { continue; }; let Some(subgraph) = self.subgraphs.get_mut(*subgraph_index) else { continue; }; let Some((from_index, to_index)) = subgraph .node_id_to_index(source.id) .zip(subgraph.node_id_to_index(destination.id)) else { continue; }; match external_source_data { Some(external_source_data) => { subgraph.remove_external_source_edge( from_index, to_index, external_source_data.clone(), ); } None => subgraph.remove_edge_raw(from_index, *edge_kind, to_index), } } Update::RemoveNode { subgraph_root_id, id, } => { let Some(subgraph_index) = subgraph_id_to_index.get(subgraph_root_id) else { continue; }; let Some(subgraph) = self.subgraphs.get_mut(*subgraph_index) else { continue; }; let Some(node_index) = subgraph.node_id_to_index(*id) else { continue; }; subgraph.remove_node(node_index); } Update::ReplaceNode { subgraph_root_id, node_weight, base_graph_node_id, } => { let Some(subgraph_index) = subgraph_id_to_index.get(subgraph_root_id) else { continue; }; let Some(subgraph) = self.subgraphs.get_mut(*subgraph_index) else { continue; }; let node_id_in_base_graph = base_graph_node_id.unwrap_or(node_weight.id()); match subgraph.node_id_to_index(node_id_in_base_graph) { Some(node_index) => { let previous_id = subgraph.replace_node(node_index, node_weight.clone()); if previous_id .is_some_and(|previous_id| previous_id != node_weight.id()) { self.id_to_split_graph_index.insert( node_weight.id(), SplitGraphNodeIndex::new(*subgraph_index, node_index), ); if let Some(previous_id) = previous_id { self.id_to_split_graph_index.remove(&previous_id); } } } None => { let index = subgraph.add_node(node_weight.clone()); self.id_to_split_graph_index.insert( node_weight.id(), SplitGraphNodeIndex::new(*subgraph_index, index), ); } } } Update::NewNode { subgraph_root_id, node_weight, } => { let node_id = node_weight.id(); let maybe_existing_node_index = self.node_id_to_index(node_id); let Some(subgraph_index) = subgraph_id_to_index.get(subgraph_root_id) else { continue; }; if let Some(split_graph_index) = maybe_existing_node_index { // NOTE: the main reason we might see a new node update for an // existing node index is because a node in an "earlier" subgraph // has had its id changed to the id and lineage id of a node in a // *later* subgraph. If there is no corresponding remove node // update for the node in the later subgraph, the graph *WILL* // be in a broken state. So ensure when doing a node id update, that // if you are giving a new node an existing node's id, that you always // remove the existing node first. if split_graph_index.subgraph == *subgraph_index { continue; } } let Some(subgraph) = self.subgraphs.get_mut(*subgraph_index) else { continue; }; let index = subgraph.add_node(node_weight.clone()); self.id_to_split_graph_index.insert( node_weight.id(), SplitGraphNodeIndex::new(*subgraph_index, index), ); } Update::NewSubGraph { subgraph_root_id } => { let new_subgraph_index = self.new_empty_subgraph(); subgraph_id_to_index.insert(*subgraph_root_id, new_subgraph_index); } } } } pub fn raw_nodes(&self) -> impl Iterator<Item = &SplitGraphNodeWeight<N>> { self.subgraphs.iter().flat_map(|subgraph| subgraph.nodes()) } pub fn nodes(&self) -> impl Iterator<Item = &N> { self.subgraphs .iter() .flat_map(|subgraph| subgraph.nodes()) .filter_map(|n| n.custom()) } pub fn edges(&self) -> impl Iterator<Item = (&E, SplitGraphNodeId, SplitGraphNodeId)> { self.subgraphs .iter() .flat_map(|subgraph| subgraph.edges()) .filter_map(|(e, source, target)| e.custom().map(|custom| (custom, source, target))) } pub fn tiny_dot_to_file(&self, prefix: &str) { for (idx, subgraph) in self.subgraphs.iter().enumerate() { subgraph.tiny_dot_to_file(&format!("{prefix}-subgraph-{}", idx + 1)); } } } fn ensure_only_one_default_edge<N, E, K>( graph: &mut SubGraph<N, E, K>, source_idx: SubGraphNodeIndex, destination_idx: SubGraphNodeIndex, edge_weight: SplitGraphEdgeWeight<E, K, N>, ) where N: CustomNodeWeight, E: CustomEdgeWeight<K>, K: EdgeKind, { let edge_weight_kind: SplitGraphEdgeWeightKind<K> = edge_weight.into(); let existing_default_targets: Vec<(_, _)> = graph .graph .edges_directed(source_idx, Outgoing) .filter(|edge_ref| { edge_weight_kind == edge_ref.weight().into() && edge_ref.weight().is_default() && edge_ref.target() != destination_idx }) .map(|edge_ref| (edge_ref.weight().clone(), edge_ref.target())) .collect(); for (edge_weight, target_idx) in existing_default_targets { graph.remove_edge_raw(source_idx, edge_weight_kind, target_idx); graph.add_edge_raw(source_idx, edge_weight.clone_as_non_default(), target_idx); } } #[cfg(test)] mod tests;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server