Skip to main content
Glama
split_snapshot.rs61.2 kB
use std::{ collections::{ HashMap, HashSet, }, sync::{ Arc, atomic::AtomicBool, }, }; use async_trait::async_trait; use corrections::correct_transforms; use itertools::Itertools as _; use petgraph::Direction::{ self, Incoming, Outgoing, }; use serde::{ Deserialize, Serialize, }; use si_events::{ ContentHash, Timestamp, WorkspaceSnapshotAddress, merkle_tree_hash::MerkleTreeHash, workspace_snapshot::{ Change, EntityKind, }, }; use si_id::{ ApprovalRequirementDefinitionId, AttributeValueId, ChangeSetId, ComponentId, EntityId, InputSocketId, SchemaId, SchemaVariantId, UserPk, ViewId, ulid::Ulid, }; use si_layer_cache::LayerDbError; use si_split_graph::{ CustomNodeWeight, SplitGraph, SplitGraphNodeIndex, SplitGraphNodeWeight, SplitGraphResult, SubGraph, SuperGraph, opt_zip::OptZip, }; use strum::{ EnumDiscriminants, EnumIter, EnumString, IntoEnumIterator, }; use telemetry::prelude::*; use tokio::{ sync::{ Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, }, task::JoinSet, time::Instant, }; use super::{ CycleCheckGuard, DependentValueRoot, EntityKindExt, InputSocketExt, SchemaVariantExt, WorkspaceSnapshotError, WorkspaceSnapshotResult, content_address::ContentAddressDiscriminants, graph::LineageId, node_weight::{ CategoryNodeWeight, NodeWeight, NodeWeightError, category_node_weight::CategoryNodeKind, }, traits::{ approval_requirement::ApprovalRequirementExt, diagram::view::ViewExt, socket::input::input_socket_from_node_weight, }, }; use crate::{ ComponentError, DalContext, EdgeWeight, EdgeWeightKind, EdgeWeightKindDiscriminants, InputSocket, NodeWeightDiscriminants, SchemaVariantError, approval_requirement::{ ApprovalRequirement, ApprovalRequirementApprover, ApprovalRequirementDefinition, }, component::ComponentResult, entity_kind::{ EntityKindError, EntityKindResult, }, layer_db_types::{ ViewContent, ViewContentV1, }, slow_rt, socket::input::InputSocketError, workspace_snapshot::{ PropSuggestionsCache, graph::traits::component::ComponentExt as _, traits::component::ComponentExt, }, }; pub mod attribute_prototype; pub mod attribute_prototype_argument; pub mod attribute_value; pub mod corrections; pub mod func; pub mod graph; pub mod prop; pub mod static_argument_value; pub type SplitSnapshotGraphV1 = SplitGraph<NodeWeight, EdgeWeight, EdgeWeightKindDiscriminants>; pub type SplitSnapshotGraphVCurrent = SplitSnapshotGraphV1; pub type SubGraphV1 = SubGraph<NodeWeight, EdgeWeight, EdgeWeightKindDiscriminants>; pub type SubGraphVCurrent = SubGraphV1; pub type UpdateV1 = si_split_graph::Update<NodeWeight, EdgeWeight, EdgeWeightKindDiscriminants>; pub type UpdateVCurrent = UpdateV1; pub type SplitRebaseBatchV1 = Vec<UpdateV1>; pub type SplitRebaseBatchVCurrent = SplitRebaseBatchV1; #[derive(Serialize, Deserialize, Debug, Clone, EnumDiscriminants)] #[strum_discriminants(derive(strum::Display, Serialize, Deserialize, EnumString, EnumIter))] pub enum SplitSnapshotStorage { SuperGraph(SuperGraphVersion), SubGraphV1(SubGraphVersion), } #[derive(Debug, Clone, EnumDiscriminants)] #[strum_discriminants(derive(strum::Display, Serialize, Deserialize, EnumString, EnumIter))] pub enum SplitSnapshotGraph { V1(SplitSnapshotGraphV1), } #[derive(Serialize, Deserialize, Debug, Clone, EnumDiscriminants)] #[strum_discriminants(derive(strum::Display, Serialize, Deserialize, EnumString, EnumIter))] pub enum SuperGraphVersion { V1(SuperGraph), } impl SuperGraphVersionDiscriminants { pub fn current_discriminant() -> Self { Self::V1 } } #[derive(Serialize, Deserialize, Debug, Clone, EnumDiscriminants)] #[strum_discriminants(derive(strum::Display, Serialize, Deserialize, EnumString, EnumIter))] pub enum SubGraphVersion { V1(SubGraphV1), } impl SubGraphVersionDiscriminants { pub fn current() -> Self { Self::V1 } } impl std::ops::Deref for SplitSnapshotGraph { type Target = SplitSnapshotGraphVCurrent; fn deref(&self) -> &Self::Target { self.inner() } } impl std::ops::DerefMut for SplitSnapshotGraph { fn deref_mut(&mut self) -> &mut Self::Target { self.inner_mut() } } impl SplitSnapshotGraph { pub fn inner(&self) -> &SplitSnapshotGraphVCurrent { match self { SplitSnapshotGraph::V1(inner) => inner, } } pub fn inner_mut(&mut self) -> &mut SplitSnapshotGraphVCurrent { match self { SplitSnapshotGraph::V1(inner) => inner, } } pub fn current_discriminant() -> SplitSnapshotGraphDiscriminants { SplitSnapshotGraphDiscriminants::iter() .next_back() .expect("Unable to get last element of an iterator guaranteed to have elements") } } #[must_use = "if unused the lock will be released immediately"] struct SnapshotReadGuard<'a> { read_only_graph: Arc<SplitSnapshotGraph>, working_copy_read_guard: RwLockReadGuard<'a, Option<SplitSnapshotGraphVCurrent>>, } #[must_use = "if unused the lock will be released immediately"] struct SnapshotWriteGuard<'a> { working_copy_write_guard: RwLockWriteGuard<'a, Option<SplitSnapshotGraphVCurrent>>, } impl std::ops::Deref for SnapshotReadGuard<'_> { type Target = SplitSnapshotGraphVCurrent; fn deref(&self) -> &Self::Target { if self.working_copy_read_guard.is_some() { let option = &*self.working_copy_read_guard; option.as_ref().expect("we confirmed it was some above") } else { &self.read_only_graph } } } impl std::ops::Deref for SnapshotWriteGuard<'_> { type Target = SplitSnapshotGraphVCurrent; fn deref(&self) -> &Self::Target { let option = &*self.working_copy_write_guard; option.as_ref().expect( "attempted to deref snapshot without copying contents into the mutable working copy", ) } } impl std::ops::DerefMut for SnapshotWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { let option: &mut Option<SplitSnapshotGraphVCurrent> = &mut self.working_copy_write_guard; &mut *option.as_mut().expect("attempted to DerefMut a snapshot without copying contents into the mutable working copy") } } #[derive(Debug, Clone)] pub struct SplitSnapshot { address: Arc<Mutex<WorkspaceSnapshotAddress>>, read_only_graph: Arc<SplitSnapshotGraph>, working_copy: Arc<RwLock<Option<SplitSnapshotGraphVCurrent>>>, cycle_check: Arc<AtomicBool>, dvu_roots: Arc<Mutex<HashSet<DependentValueRoot>>>, prop_suggestions: Arc<PropSuggestionsCache>, } impl SplitSnapshot { pub async fn id(&self) -> WorkspaceSnapshotAddress { *self.address.lock().await } pub fn from_graph(graph: SplitSnapshotGraph) -> Self { Self { address: Arc::new(Mutex::new(WorkspaceSnapshotAddress::nil())), read_only_graph: Arc::new(graph), working_copy: Arc::new(RwLock::new(None)), cycle_check: Arc::new(AtomicBool::new(false)), dvu_roots: Arc::new(Mutex::new(HashSet::new())), prop_suggestions: Arc::new(PropSuggestionsCache::default()), } } pub async fn subgraph_count(&self) -> usize { self.working_copy().await.subgraph_count() } fn add_category_nodes(graph: &mut SplitSnapshotGraphVCurrent) -> WorkspaceSnapshotResult<Ulid> { let mut view_category_id = Ulid::new(); for category_node_kind in CategoryNodeKind::iter() { let (id, lineage_id) = if let Some(default_id) = category_node_kind.static_id() { (default_id, default_id) } else { (Ulid::new(), Ulid::new()) }; if category_node_kind == CategoryNodeKind::View { view_category_id = id; } graph.add_or_replace_node(NodeWeight::Category(CategoryNodeWeight::new( id, lineage_id, category_node_kind, )))?; graph.add_edge( graph.root_id()?, EdgeWeight::new(EdgeWeightKind::new_use()), id, )?; } Ok(view_category_id) } async fn add_default_view( ctx: &DalContext, graph: &mut SplitSnapshotGraphVCurrent, view_category_id: Ulid, ) -> WorkspaceSnapshotResult<()> { let id = Ulid::new(); let lineage_id = Ulid::new(); let content = ViewContent::V1(ViewContentV1 { timestamp: Timestamp::now(), name: "DEFAULT".to_owned(), }); let (content_address, _) = ctx.layer_db().cas().write( Arc::new(content.clone().into()), None, ctx.events_tenancy(), ctx.events_actor(), )?; let node_weight = NodeWeight::new_view(id, lineage_id, content_address); graph.add_or_replace_node(node_weight.clone())?; graph.add_edge( view_category_id, EdgeWeight::new(EdgeWeightKind::new_use_default()), id, )?; Ok(()) } pub async fn initial(ctx: &DalContext, split_max: usize) -> WorkspaceSnapshotResult<Self> { let mut graph = SplitSnapshotGraphVCurrent::new(split_max); let view_category_id = Self::add_category_nodes(&mut graph)?; Self::add_default_view(ctx, &mut graph, view_category_id).await?; // We do not care about any field other than "working_copy" because // "write" will populate them using the assigned working copy. let initial = Self { address: Arc::new(Mutex::new(WorkspaceSnapshotAddress::nil())), read_only_graph: Arc::new(SplitSnapshotGraph::V1(graph)), working_copy: Arc::new(RwLock::new(None)), cycle_check: Arc::new(AtomicBool::new(false)), dvu_roots: Arc::new(Mutex::new(HashSet::new())), prop_suggestions: Arc::new(PropSuggestionsCache::default()), }; initial.write(ctx).await?; Ok(initial) } pub async fn current_rebase_batch( &self, ) -> WorkspaceSnapshotResult<Option<SplitRebaseBatchVCurrent>> { let self_clone = self.clone(); let updates = slow_rt::spawn(async move { let mut working_copy = self_clone.working_copy_mut().await; working_copy.cleanup_and_merkle_tree_hash(); self_clone.read_only_graph.detect_updates(&working_copy) })? .await?; Ok((!updates.is_empty()).then_some(updates)) } pub async fn detect_updates( &self, updated: &Self, ) -> WorkspaceSnapshotResult<SplitRebaseBatchVCurrent> { let self_clone = self.clone(); let updated_clone = updated.clone(); Ok(slow_rt::spawn(async move { self_clone .working_copy() .await .detect_updates(&*updated_clone.working_copy().await) })? .await?) } #[instrument( name = "split_snapshot.calculate_rebase_batch", level = "info", skip_all )] pub async fn calculate_rebase_batch( base_snapshot: Arc<Self>, updated_snapshot: Arc<Self>, ) -> WorkspaceSnapshotResult<Option<SplitRebaseBatchVCurrent>> { let updates = slow_rt::spawn(async move { base_snapshot.detect_updates(&updated_snapshot).await })? .await??; Ok((!updates.is_empty()).then_some(updates)) } #[instrument(name = "split_snapshot.find", level = "debug", skip_all, fields())] pub async fn find( ctx: &DalContext, split_snapshot_supergraph_addr: WorkspaceSnapshotAddress, ) -> WorkspaceSnapshotResult<Self> { let snapshot = match ctx .layer_db() .split_snapshot_supergraph() .read_wait_for_memory(&split_snapshot_supergraph_addr) .await { Ok(supergraph) => { let supergraph = supergraph.ok_or( WorkspaceSnapshotError::SplitSnapshotSuperGraphMissingAtAddress( split_snapshot_supergraph_addr, ), )?; let mut subgraphs = vec![]; for &subgraph_address in supergraph.addresses() { let subgraph_address = subgraph_address.into(); let subgraph = ctx .layer_db() .split_snapshot_subgraph() .read_wait_for_memory(&subgraph_address) .await? .ok_or( WorkspaceSnapshotError::SplitSnapshotSubGraphMissingAtAddress( subgraph_address, ), )?; // xxx: we have to make the splitgraph constructable from arcs, it will // xxx: need to handle the copy-on-write behavior internally subgraphs.push(subgraph.as_ref().clone()); } Arc::new(SplitSnapshotGraph::V1( SplitSnapshotGraphVCurrent::from_parts(supergraph.as_ref().clone(), subgraphs), )) } Err(err) => match err { LayerDbError::Postcard(_) => { return Err(WorkspaceSnapshotError::WorkspaceSnapshotNotMigrated( split_snapshot_supergraph_addr, )); } err => Err(err)?, }, }; Ok(Self { address: Arc::new(Mutex::new(split_snapshot_supergraph_addr)), read_only_graph: snapshot, working_copy: Arc::new(RwLock::new(None)), cycle_check: Arc::new(AtomicBool::new(false)), dvu_roots: Arc::new(Mutex::new(HashSet::new())), prop_suggestions: Arc::new(PropSuggestionsCache::default()), }) } pub async fn find_for_change_set( ctx: &DalContext, change_set_id: ChangeSetId, ) -> WorkspaceSnapshotResult<Self> { // There's a race between finding which address to retrieve and actually retrieving it // where it's possible for the content at the address to be garbage collected, and no // longer be retrievable. We'll re-fetch which snapshot address to use, and will retry, // hoping we don't get unlucky every time let mut retries: u8 = 5; while retries > 0 { retries -= 1; let row = ctx .txns() .await? .pg() .query_opt( "SELECT workspace_snapshot_address FROM change_set_pointers WHERE id = $1", &[&change_set_id], ) .await? .ok_or( WorkspaceSnapshotError::ChangeSetMissingWorkspaceSnapshotAddress(change_set_id), )?; let address: WorkspaceSnapshotAddress = row.try_get("workspace_snapshot_address")?; match Self::find(ctx, address).await { Ok(snapshot) => return Ok(snapshot), Err( WorkspaceSnapshotError::SplitSnapshotSuperGraphMissingAtAddress(_) | WorkspaceSnapshotError::SplitSnapshotSubGraphMissingAtAddress(_), ) => { warn!( "Unable to retrieve split snapshot {:?} for change set {:?}. Retries remaining: {}", address, change_set_id, retries ); tokio::time::sleep(tokio::time::Duration::from_millis(5)).await; continue; } Err(e) => return Err(e), } } error!( "Retries exceeded trying to fetch split snapshot for change set {:?}", change_set_id ); Err(WorkspaceSnapshotError::WorkspaceSnapshotNotFetched) } pub async fn has_mutable_working_copy(&self) -> bool { self.working_copy.read().await.is_some() } // pub async fn write_to_files(&self, read_only: bool, rebase_batch_bytes: Option<Vec<u8>>) { // std::fs::create_dir_all("snapshot_dumps").unwrap_or_else(|_| { // warn!("Directory snapshot_dumps already exists or could not be created"); // }); // let working_copy = if read_only { // self.read_only_graph.as_ref() // } else { // self.cleanup_and_merkle_tree_hash().await.unwrap(); // &*self.working_copy_mut().await // }; // let prefix = if read_only { // "read_only" // } else { // "working_copy" // }; // // Write the supergraph // if let Ok(file) = // std::fs::File::create(format!("snapshot_dumps/{prefix}_supergraph.snapshot")) // { // if let Ok((bytes, _)) = // si_layer_cache::db::serialize::to_vec(&working_copy.supergraph()) // { // if let Err(e) = // std::io::Write::write_all(&mut std::io::BufWriter::new(file), &bytes) // { // error!("Failed to write supergraph to file: {}", e); // } // } else { // println!("Failed to serialize supergraph"); // } // } // // Write each subgraph // for (i, subgraph) in working_copy.subgraphs().iter().enumerate() { // let filename = format!("snapshot_dumps/{prefix}_subgraph_{i}.snapshot"); // if let Ok(file) = std::fs::File::create(&filename) { // match si_layer_cache::db::serialize::to_vec(&subgraph) { // Ok((bytes, _)) => { // match std::io::Write::write_all(&mut std::io::BufWriter::new(file), &bytes) // { // Ok(_) => {} // Err(e) => { // println!("Failed to write subgraph {} to file: {}", i, e); // } // } // } // Err(e) => { // println!("Failed to serialize subgraph {}: {}", i, e); // } // } // } else { // println!("Failed to create file {}", filename); // } // } // // Write the rebase batch to a file if it exists // if let Some(bytes) = rebase_batch_bytes { // let filename = "snapshot_dumps/rebase_batch.rbatch"; // if let Ok(file) = std::fs::File::create(filename) { // match std::io::Write::write_all(&mut std::io::BufWriter::new(file), &bytes) { // Ok(_) => { // println!("Wrote rebase batch to {}", filename); // } // Err(e) => { // println!("Failed to write rebase batch to file: {}", e); // } // } // } else { // println!("Failed to create file {}", filename); // } // } // println!("Wrote working_copy supergraph and subgraphs to snapshot_dumps directory"); // } pub async fn write( &self, ctx: &DalContext, ) -> WorkspaceSnapshotResult<WorkspaceSnapshotAddress> { let self_clone = self.clone(); let layer_db = ctx.layer_db().clone(); let events_tenancy = ctx.events_tenancy(); let events_actor = ctx.events_actor(); let supergraph_address = slow_rt::spawn(async move { let mut working_copy = self_clone.working_copy_mut().await; let start = Instant::now(); working_copy.cleanup_and_merkle_tree_hash(); warn!("cleaned up working copy in {:?}", start.elapsed()); warn!( "current addresses: {:?}", working_copy.supergraph().addresses() ); let current_supergraph = working_copy.supergraph(); let mut new_supergraph = SuperGraph::new( current_supergraph.split_max(), current_supergraph.root_index(), current_supergraph.external_source_map().clone(), ); let mut join_set = JoinSet::new(); let subgraph_indexes = OptZip::new( self_clone .read_only_graph .subgraphs() .iter() .enumerate() .map(|(idx, _)| idx), working_copy .subgraphs() .iter() .enumerate() .map(|(idx, _)| idx), ) .collect::<Vec<_>>(); drop(working_copy); for (original_subgraph_idx, new_subgraph_idx) in subgraph_indexes { let self_clone_clone = self_clone.clone(); let layer_db_clone = layer_db.clone(); join_set.spawn(async move { let start = Instant::now(); let original_subgraph = original_subgraph_idx.and_then(|index| { self_clone_clone .read_only_graph .subgraphs() .get(index) .map(|subgraph| (index, subgraph)) }); let working_copy = self_clone_clone.working_copy().await; let new_subgraph = match new_subgraph_idx { Some(index) => working_copy .subgraphs() .get(index) .map(|subgraph| (index, subgraph)), None => None, }; let subgraph_address_and_index = match (original_subgraph, new_subgraph) { (Some((orig_idx, orig)), Some((_, working))) => { if orig.root_node_merkle_tree_hash() != working.root_node_merkle_tree_hash() { let (new_address, _) = layer_db_clone.split_snapshot_subgraph().write( Arc::new(working.clone()), None, events_tenancy, events_actor, )?; warn!( "rewrote subgraph in {:?} new address {:?}", start.elapsed(), new_address ); (orig_idx, new_address) } else { let subgraph_address: WorkspaceSnapshotAddress = match self_clone_clone .read_only_graph .supergraph() .address_for_subgraph(orig_idx) { Some(addr) => addr.into(), None => { let (new_address, _) = layer_db_clone.split_snapshot_subgraph().write( Arc::new(working.clone()), None, events_tenancy, events_actor, )?; new_address } }; (orig_idx, subgraph_address) } } (None, Some((new_index, working))) => { let (new_address, _) = layer_db_clone.split_snapshot_subgraph().write( Arc::new(working.clone()), None, events_tenancy, events_actor, )?; warn!( "wrote new subgraph in {:?}, address: {:?}", start.elapsed(), new_address ); (new_index, new_address) } (Some(_), None) => { todo!("we've removed a subgraph") } (None, None) => unreachable!("opt zip will never produce this"), }; Ok::<_, WorkspaceSnapshotError>(subgraph_address_and_index) }); } let mut subgraph_addresses = vec![]; // Join all returns in the order that futures complete, not the order they were spawned, so we have to sort for result in join_set.join_all().await { let address_and_index = result?; subgraph_addresses.push(address_and_index); } subgraph_addresses.sort_by_key(|(index, _)| *index); for (_, address) in subgraph_addresses { new_supergraph.add_subgraph_address(address.into()); } warn!("new subgraph_addresses: {:?}", new_supergraph.addresses()); let start = Instant::now(); let (supergraph_address, _) = layer_db.split_snapshot_supergraph().write( Arc::new(new_supergraph), None, events_tenancy, events_actor, )?; warn!( "wrote supergraph in {:?}, new address: {:?}", start.elapsed(), supergraph_address ); Ok::<WorkspaceSnapshotAddress, WorkspaceSnapshotError>(supergraph_address) })? .await??; *self.address.lock().await = supergraph_address; Ok(supergraph_address) } async fn working_copy(&self) -> SnapshotReadGuard<'_> { SnapshotReadGuard { read_only_graph: self.read_only_graph.clone(), working_copy_read_guard: self.working_copy.read().await, } } async fn working_copy_mut(&self) -> SnapshotWriteGuard<'_> { let mut working_copy = self.working_copy.write().await; if working_copy.is_none() { *working_copy = Some(self.read_only_graph.inner().clone()); } SnapshotWriteGuard { working_copy_write_guard: working_copy, } } pub async fn root(&self) -> WorkspaceSnapshotResult<Ulid> { Ok(self.working_copy().await.root_id()?) } pub async fn generate_ulid(&self) -> WorkspaceSnapshotResult<Ulid> { // XXX: do we need to use the generator for monotonically increasing IDs? is that really necessary? Ok(Ulid::new()) } pub async fn enable_cycle_check(&self) -> CycleCheckGuard { self.cycle_check .store(true, std::sync::atomic::Ordering::Relaxed); CycleCheckGuard::new(self.cycle_check.clone()) } pub async fn disable_cycle_check(&self) { self.cycle_check .store(false, std::sync::atomic::Ordering::Relaxed); } pub async fn cycle_check(&self) -> bool { self.cycle_check.load(std::sync::atomic::Ordering::Relaxed) } pub async fn is_acyclic_directed(&self) -> bool { self.working_copy().await.is_acyclic_directed() } pub async fn add_or_replace_node(&self, node: NodeWeight) -> WorkspaceSnapshotResult<()> { self.working_copy_mut().await.add_or_replace_node(node)?; Ok(()) } pub async fn add_ordered_node(&self, node: NodeWeight) -> WorkspaceSnapshotResult<()> { self.working_copy_mut().await.add_ordered_node(node)?; Ok(()) } pub async fn update_content( &self, id: Ulid, new_content_hash: ContentHash, ) -> WorkspaceSnapshotResult<()> { let mut working_copy = self.working_copy_mut().await; match working_copy.node_weight_mut(id) { Some(node_weight_mut) => { node_weight_mut.new_content_hash(new_content_hash)?; working_copy.touch_node(id); Ok(()) } None => Err(WorkspaceSnapshotError::NodeNotFoundAtId(id)), } } pub async fn add_edge( &self, from_id: impl Into<Ulid>, edge_weight: EdgeWeight, to_id: impl Into<Ulid>, ) -> WorkspaceSnapshotResult<()> { if self.cycle_check().await { self.working_copy_mut().await.add_edge_with_cycle_check( from_id.into(), edge_weight, to_id.into(), )?; } else { self.working_copy_mut() .await .add_edge(from_id.into(), edge_weight, to_id.into())?; } Ok(()) } pub async fn add_ordered_edge( &self, from_id: impl Into<Ulid>, edge_weight: EdgeWeight, to_id: impl Into<Ulid>, ) -> WorkspaceSnapshotResult<()> { self.working_copy_mut().await.add_ordered_edge( from_id.into(), edge_weight, to_id.into(), )?; Ok(()) } pub async fn detect_changes( &self, updated_snapshot: &Self, ) -> WorkspaceSnapshotResult<Vec<Change>> { Ok(self .working_copy() .await .detect_changes(&*updated_snapshot.working_copy().await)?) } pub async fn perform_updates(&self, updates: &[UpdateVCurrent]) -> WorkspaceSnapshotResult<()> { let self_clone = self.clone(); let updates = updates.to_vec(); Ok(slow_rt::spawn(async move { self_clone .working_copy_mut() .await .perform_updates(&updates) })? .await?) } pub async fn import_component_subgraph( &self, _other: &Arc<Self>, _component_id: ComponentId, ) -> WorkspaceSnapshotResult<()> { // XXX: Implement import component subgraph Ok(()) } pub async fn raw_node_weight( &self, id: impl Into<Ulid>, ) -> Option<SplitGraphNodeWeight<NodeWeight>> { self.working_copy() .await .raw_node_weight(id.into()) .cloned() } pub async fn get_node_weight( &self, id: impl Into<Ulid>, ) -> WorkspaceSnapshotResult<NodeWeight> { let id = id.into(); self.get_node_weight_opt(id) .await .ok_or(WorkspaceSnapshotError::NodeNotFoundAtId(id)) } pub async fn split_graph_node_index(&self, id: impl Into<Ulid>) -> Option<SplitGraphNodeIndex> { self.working_copy().await.node_id_to_index(id.into()) } pub async fn get_node_weight_opt(&self, id: impl Into<Ulid>) -> Option<NodeWeight> { self.working_copy().await.node_weight(id.into()).cloned() } pub async fn cleanup(&self) -> WorkspaceSnapshotResult<()> { self.working_copy_mut().await.cleanup(); Ok(()) } pub async fn cleanup_and_merkle_tree_hash(&self) -> WorkspaceSnapshotResult<()> { self.working_copy_mut().await.cleanup_and_merkle_tree_hash(); Ok(()) } pub async fn nodes(&self) -> WorkspaceSnapshotResult<Vec<NodeWeight>> { Ok(self.working_copy().await.nodes().cloned().collect()) } pub async fn edges(&self) -> WorkspaceSnapshotResult<Vec<(EdgeWeight, Ulid, Ulid)>> { Ok(self .working_copy() .await .edges() .map(|(weight, src, dst)| (weight.clone(), src, dst)) .collect()) } pub async fn node_exists(&self, id: impl Into<Ulid>) -> bool { self.working_copy() .await .node_id_to_index(id.into()) .is_some() } pub async fn get_category_node_or_err( &self, kind: CategoryNodeKind, ) -> WorkspaceSnapshotResult<Ulid> { self.get_category_node(kind) .await? .ok_or(WorkspaceSnapshotError::CategoryNodeNotFound(kind)) } pub async fn get_category_node( &self, kind: CategoryNodeKind, ) -> WorkspaceSnapshotResult<Option<Ulid>> { let working_copy = self.working_copy().await; let source_id = working_copy.root_id()?; Ok(working_copy .edges_directed(source_id, Outgoing)? .find( |edge_ref| match working_copy.node_weight(edge_ref.target()) { Some(NodeWeight::Category(category_node)) => category_node.kind() == kind, _ => false, }, ) .map(|edge_ref| edge_ref.target())) } pub async fn edges_directed_debug( &self, id: impl Into<Ulid>, direction: Direction, ) -> WorkspaceSnapshotResult<Vec<(EdgeWeight, Ulid, Ulid)>> { Ok(self .working_copy() .await .edges_directed(id.into(), direction)? .map(|edge_ref| { ( edge_ref.weight().clone(), edge_ref.source(), edge_ref.target(), ) }) .collect()) } pub async fn edges_directed( &self, id: impl Into<Ulid>, direction: Direction, ) -> WorkspaceSnapshotResult<Vec<(EdgeWeight, Ulid, Ulid)>> { Ok(self .working_copy() .await .edges_directed(id.into(), direction)? .map(|edge_ref| { ( edge_ref.weight().clone(), edge_ref.source(), edge_ref.target(), ) }) .collect()) } pub async fn edges_directed_for_edge_weight_kind( &self, id: impl Into<Ulid>, direction: Direction, edge_kind: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotResult<Vec<(EdgeWeight, Ulid, Ulid)>> { Ok(self .working_copy() .await .edges_directed_for_edge_weight_kind(id.into(), direction, edge_kind)? .map(|edge_ref| { ( edge_ref.weight().clone(), edge_ref.source(), edge_ref.target(), ) }) .collect()) } pub async fn remove_all_edges(&self, id: impl Into<Ulid>) -> WorkspaceSnapshotResult<()> { // Removing all edges to and from a node is the same as removing the node // the remove node method will handle the bookkeeping necessary for recalculating the // merkle tree self.working_copy_mut().await.remove_node(id.into())?; Ok(()) } pub async fn incoming_sources_for_edge_weight_kind( &self, id: impl Into<Ulid>, edge_weight_kind_discrim: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotResult<Vec<Ulid>> { Ok(self .working_copy() .await .edges_directed_for_edge_weight_kind(id.into(), Incoming, edge_weight_kind_discrim)? .map(|edge_ref| edge_ref.source()) .collect()) } pub async fn source_opt( &self, id: impl Into<Ulid>, kind: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotResult<Option<Ulid>> { Ok(self .working_copy() .await .directed_unique_neighbor_of_edge_weight_kind(id.into(), Incoming, kind)?) } pub async fn outgoing_targets_for_edge_weight_kind( &self, id: impl Into<Ulid>, edge_weight_kind_discrim: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotResult<Vec<Ulid>> { Ok(self .working_copy() .await .edges_directed_for_edge_weight_kind(id.into(), Outgoing, edge_weight_kind_discrim)? .map(|edge_ref| edge_ref.target()) .collect()) } pub async fn all_outgoing_targets( &self, id: impl Into<Ulid>, ) -> WorkspaceSnapshotResult<Vec<NodeWeight>> { let working_copy = self.working_copy().await; let targets = working_copy .edges_directed(id.into(), Outgoing)? .filter_map(|edge_ref| working_copy.node_weight(edge_ref.target())) .cloned() .collect(); Ok(targets) } pub async fn all_incoming_sources( &self, id: impl Into<Ulid>, ) -> WorkspaceSnapshotResult<Vec<NodeWeight>> { let working_copy = self.working_copy().await; let sources = working_copy .edges_directed(id.into(), Incoming)? .filter_map(|edge_ref| working_copy.node_weight(edge_ref.source())) .cloned() .collect(); Ok(sources) } pub async fn remove_incoming_edges_of_kind( &self, target_id: impl Into<Ulid>, kind: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotResult<()> { let target_id = target_id.into(); let mut working_copy = self.working_copy_mut().await; let sources = working_copy .edges_directed_for_edge_weight_kind(target_id, Incoming, kind)? .map(|edge_ref| edge_ref.source()) .collect_vec(); for source_id in sources { working_copy.remove_edge(source_id, kind, target_id)?; } Ok(()) } pub async fn remove_outgoing_edges_of_kind( &self, source_id: impl Into<Ulid>, kind: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotResult<()> { let source_id = source_id.into(); let mut working_copy = self.working_copy_mut().await; let targets = working_copy .edges_directed_for_edge_weight_kind(source_id, Incoming, kind)? .map(|edge_ref| edge_ref.target()) .collect_vec(); for target_id in targets { working_copy.remove_edge(source_id, kind, target_id)?; } Ok(()) } pub async fn get_edges_between_nodes( &self, from_node_id: Ulid, to_node_id: Ulid, ) -> WorkspaceSnapshotResult<Vec<EdgeWeight>> { Ok(self .working_copy() .await .edges_directed(from_node_id, Outgoing)? .filter(|edge_ref| edge_ref.target() == to_node_id) .map(|edge_ref| edge_ref.weight().clone()) .collect()) } pub async fn remove_node_by_id(&self, id: impl Into<Ulid>) -> WorkspaceSnapshotResult<()> { self.working_copy_mut().await.remove_node(id.into())?; Ok(()) } pub async fn remove_edge( &self, source_id: impl Into<Ulid>, target_id: impl Into<Ulid>, edge_kind: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotResult<()> { self.working_copy_mut() .await .remove_edge(source_id.into(), edge_kind, target_id.into())?; Ok(()) } pub async fn find_edge( &self, from_id: impl Into<Ulid>, to_id: impl Into<Ulid>, edge_weight_kind: EdgeWeightKindDiscriminants, ) -> Option<EdgeWeight> { self.working_copy() .await .find_edge(from_id.into(), to_id.into(), edge_weight_kind) .cloned() } pub async fn update_node_id( &self, current_id: impl Into<Ulid>, new_id: impl Into<Ulid>, new_lineage_id: LineageId, ) -> WorkspaceSnapshotResult<()> { self.working_copy_mut().await.update_node_id( current_id.into(), new_id.into(), new_lineage_id, )?; Ok(()) } pub async fn ordered_children_for_node( &self, id: impl Into<Ulid>, ) -> WorkspaceSnapshotResult<Option<Vec<Ulid>>> { Ok(self.working_copy().await.ordered_children(id.into())) } pub async fn dvu_root_check(&self, root: DependentValueRoot) -> bool { // ensure we don't grow the graph unnecessarily by adding the same value // in a single edit session let mut dvu_roots = self.dvu_roots.lock().await; if dvu_roots.contains(&root) { true } else { dvu_roots.insert(root); false } } pub async fn schema_variant_id_for_component_id( &self, component_id: ComponentId, ) -> ComponentResult<SchemaVariantId> { let component_id = component_id.into(); let working_copy = self.working_copy().await; if working_copy.node_id_to_index(component_id).is_none() { return Err(ComponentError::NotFound(component_id.into())); } let sv_id = working_copy .edges_directed(component_id, Outgoing)? .find(|edge_ref| { matches!(edge_ref.weight().kind(), EdgeWeightKind::Use { .. }) && matches!( working_copy.node_weight(edge_ref.target()), Some(NodeWeight::SchemaVariant(_)) ) }) .map(|edge_ref| edge_ref.target().into()) .ok_or(ComponentError::SchemaVariantNotFound(component_id.into()))?; Ok(sv_id) } /// Get the prop suggestions cache for autosubscribe optimization pub async fn prop_suggestions_cache( &self, ctx: &DalContext, ) -> WorkspaceSnapshotResult<&PropSuggestionsCache> { if !self.prop_suggestions.populated() { self.prop_suggestions.populate(ctx).await?; } Ok(&self.prop_suggestions) } /// Get the prop suggestions cache for autosubscribe optimization without populating it pub async fn prop_suggestions_cache_no_populate( &self, ) -> WorkspaceSnapshotResult<&PropSuggestionsCache> { Ok(&self.prop_suggestions) } /// Clear the prop suggestions cache (useful when schema variants change) pub fn clear_prop_suggestions_cache(&self) { self.prop_suggestions.clear(); } pub async fn revert(&self) { let mut working_copy = self.working_copy.write().await; if working_copy.is_some() { *working_copy = None; } } pub async fn correct_transforms( &self, updates: Vec<UpdateVCurrent>, from_different_change_set: bool, ) -> WorkspaceSnapshotResult<Vec<UpdateVCurrent>> { Ok(correct_transforms( &*self.working_copy().await, updates, from_different_change_set, )?) } /// Get the target node of the outgoing edge of the given kind /// Returns an error if there is more than one matching edge pub async fn target_opt( &self, node_id: Ulid, kind: impl Into<EdgeWeightKindDiscriminants>, ) -> WorkspaceSnapshotResult<Option<Ulid>> { let kind = kind.into(); let results = self .outgoing_targets_for_edge_weight_kind(node_id, kind) .await?; if results.len() > 1 { return Err(WorkspaceSnapshotError::TooManyEdgesOfKind(node_id, kind)); } Ok(results.first().copied()) } } #[async_trait] impl ApprovalRequirementExt for SplitSnapshot { async fn new_definition( &self, _ctx: &DalContext, _entity_id: Ulid, _minimum_approvers_count: usize, _approvers: HashSet<ApprovalRequirementApprover>, ) -> WorkspaceSnapshotResult<ApprovalRequirementDefinitionId> { // XXX: implement Ok(ApprovalRequirementDefinitionId::new()) } async fn remove_definition( &self, _approval_requirement_definition_id: ApprovalRequirementDefinitionId, ) -> WorkspaceSnapshotResult<()> { // XXX: implement Ok(()) } async fn add_individual_approver_for_definition( &self, _ctx: &DalContext, _id: ApprovalRequirementDefinitionId, _user_id: UserPk, ) -> WorkspaceSnapshotResult<()> { Ok(()) } async fn remove_individual_approver_for_definition( &self, _ctx: &DalContext, _id: ApprovalRequirementDefinitionId, _user_id: UserPk, ) -> WorkspaceSnapshotResult<()> { Ok(()) } async fn approval_requirements_for_changes( &self, _ctx: &DalContext, _changes: &[Change], ) -> WorkspaceSnapshotResult<(Vec<ApprovalRequirement>, HashMap<EntityId, MerkleTreeHash>)> { Ok((Vec::new(), HashMap::new())) } async fn approval_requirement_definitions_for_entity_id_opt( &self, _ctx: &DalContext, _entity_id: EntityId, ) -> WorkspaceSnapshotResult<Option<Vec<ApprovalRequirementDefinition>>> { Ok(None) } async fn entity_id_for_approval_requirement_definition_id( &self, _id: ApprovalRequirementDefinitionId, ) -> WorkspaceSnapshotResult<EntityId> { Ok(EntityId::new()) } async fn get_approval_requirement_definition_by_id( &self, _ctx: &DalContext, _id: ApprovalRequirementDefinitionId, ) -> WorkspaceSnapshotResult<ApprovalRequirementDefinition> { Ok(ApprovalRequirementDefinition::fake()) } } #[async_trait] impl InputSocketExt for SplitSnapshot { async fn get_input_socket( &self, ctx: &DalContext, id: InputSocketId, ) -> WorkspaceSnapshotResult<InputSocket> { let working_copy = self.working_copy().await; let node_weight = working_copy .node_weight(id.into()) .ok_or(WorkspaceSnapshotError::NodeNotFoundAtId(id.into()))?; let input_socket_node_weight = match node_weight { NodeWeight::InputSocket(input_socket_node_weight) => input_socket_node_weight, unexpected => { return Err(NodeWeightError::UnexpectedNodeWeightVariant( unexpected.into(), NodeWeightDiscriminants::InputSocket, ))?; } }; Ok(input_socket_from_node_weight(ctx, input_socket_node_weight) .await .map_err(Box::new)?) } async fn get_input_socket_by_name_opt( &self, ctx: &DalContext, name: &str, schema_variant_id: SchemaVariantId, ) -> WorkspaceSnapshotResult<Option<InputSocket>> { Ok(self .list_input_sockets(ctx, schema_variant_id) .await? .into_iter() .find(|socket| socket.name() == name)) } async fn list_input_socket_ids_for_schema_variant( &self, schema_variant_id: SchemaVariantId, ) -> WorkspaceSnapshotResult<Vec<InputSocketId>> { let working_copy = self.working_copy().await; let result: Vec<_> = working_copy .edges_directed_for_edge_weight_kind( schema_variant_id.into(), Outgoing, EdgeWeightKindDiscriminants::Socket, )? .filter_map( |edge_ref| match working_copy.node_weight(edge_ref.target()) { Some(NodeWeight::InputSocket(_)) => Some(edge_ref.target().into()), _ => None, }, ) .collect(); Ok(result) } async fn list_input_sockets( &self, ctx: &DalContext, schema_variant_id: SchemaVariantId, ) -> WorkspaceSnapshotResult<Vec<InputSocket>> { let working_copy = self.working_copy().await; let input_sockets = working_copy .edges_directed_for_edge_weight_kind( schema_variant_id.into(), Outgoing, EdgeWeightKindDiscriminants::Socket, )? .filter_map( |edge_ref| match working_copy.node_weight(edge_ref.target()) { Some(NodeWeight::InputSocket(inner)) => Some(inner), _ => None, }, ); let mut result = vec![]; for input_socket_node_weight in input_sockets { result.push( input_socket_from_node_weight(ctx, input_socket_node_weight) .await .map_err(Box::new)?, ); } Ok(result) } async fn all_attribute_value_ids_everywhere_for_input_socket_id( &self, input_socket_id: InputSocketId, ) -> WorkspaceSnapshotResult<Vec<AttributeValueId>> { let working_copy = self.working_copy().await; let result: Vec<_> = working_copy .edges_directed_for_edge_weight_kind( input_socket_id.into(), Incoming, EdgeWeightKindDiscriminants::Socket, )? .filter_map( |edge_ref| match working_copy.node_weight(edge_ref.source()) { Some(NodeWeight::AttributeValue(_)) => Some(edge_ref.source().into()), _ => None, }, ) .collect(); Ok(result) } async fn component_attribute_value_id_for_input_socket_id( &self, input_socket_id: InputSocketId, component_id: ComponentId, ) -> WorkspaceSnapshotResult<AttributeValueId> { let working_copy = self.working_copy().await; let mut result = None; for socket_value_edge_ref in working_copy.edges_directed_for_edge_weight_kind( component_id.into(), Outgoing, EdgeWeightKindDiscriminants::SocketValue, )? { for _ in working_copy .edges_directed_for_edge_weight_kind( socket_value_edge_ref.target(), Outgoing, EdgeWeightKindDiscriminants::Socket, )? .filter(|edge_ref| input_socket_id == edge_ref.target().into()) { if result.is_some() { return Err(Box::new(InputSocketError::FoundTooManyForInputSocketId( input_socket_id, component_id, )) .into()); } result = Some(socket_value_edge_ref.target().into()); } } if let Some(av_id) = result { Ok(av_id) } else { Err( Box::new(InputSocketError::MissingAttributeValueForComponent( input_socket_id, component_id, )) .into(), ) } } async fn input_socket_id_find_for_attribute_value_id( &self, attribute_value_id: AttributeValueId, ) -> WorkspaceSnapshotResult<Option<InputSocketId>> { let working_copy = self.working_copy().await; let mut result = None; let av_as_ulid = attribute_value_id.into(); for edge_ref in working_copy.edges_directed_for_edge_weight_kind( av_as_ulid, Outgoing, EdgeWeightKindDiscriminants::Socket, )? { if result.is_some() { return Err(Box::new( InputSocketError::MultipleSocketsForAttributeValue(attribute_value_id), ))?; } result = Some(edge_ref.target().into()); } Ok(result) } } pub fn schema_ids_for_schema_variant_id( graph: &SplitSnapshotGraphVCurrent, schema_variant_id: SchemaVariantId, ) -> SplitGraphResult<Vec<SchemaId>> { let sv_ulid = schema_variant_id.into(); Ok(graph .edges_directed_for_edge_weight_kind(sv_ulid, Incoming, EdgeWeightKindDiscriminants::Use)? .filter_map(|edge_ref| match graph.node_weight(edge_ref.source()) { Some(NodeWeight::Content(content)) if content.content_address_discriminants() == ContentAddressDiscriminants::Schema => { Some(edge_ref.source().into()) } _ => None, }) .collect()) } #[async_trait] impl SchemaVariantExt for SplitSnapshot { async fn schema_id_for_schema_variant_id( &self, schema_variant_id: SchemaVariantId, ) -> WorkspaceSnapshotResult<SchemaId> { let working_copy = self.working_copy().await; let schemas = schema_ids_for_schema_variant_id(&working_copy, schema_variant_id)?; let schema_id = schemas .first() .ok_or_else(|| Box::new(SchemaVariantError::SchemaNotFound(schema_variant_id)))?; if schemas.len() > 1 { return Err(Box::new(SchemaVariantError::MoreThanOneSchemaFound( schema_variant_id, )))?; } Ok(schema_id.into()) } async fn schema_variant_add_edge_to_input_socket( &self, schema_variant_id: SchemaVariantId, input_socket_id: InputSocketId, ) -> WorkspaceSnapshotResult<()> { let mut working_copy = self.working_copy_mut().await; working_copy.add_edge( schema_variant_id.into(), EdgeWeight::new(EdgeWeightKind::Socket), input_socket_id.into(), )?; Ok(()) } } #[async_trait] impl EntityKindExt for SplitSnapshot { async fn get_entity_kind_for_id(&self, id: EntityId) -> EntityKindResult<EntityKind> { self.working_copy() .await .node_weight(id.into()) .map(Into::into) .ok_or(EntityKindError::NodeNotFound(id)) } } #[async_trait] impl ViewExt for SplitSnapshot { async fn view_remove(&self, view_id: ViewId) -> WorkspaceSnapshotResult<()> { // If there are any Components remaining in the View, this View _CANNOT_ be the only View they // are in. If this View is the only View _ANY_ of the items are in, we do not allow removal // of the View. // View --Use--> Geometry --Represents--> // {Component, DiagramObject <--DiagramObject-- View (on canvas)} // // Component <--Represents-- Geometry <--Use-- View // // View (on canvas) --DiagramObject--> DiagramObject <--Represents-- Geometry <--Use-- View let mut working_copy = self.working_copy_mut().await; let mut would_be_orphaned_component_ids = Vec::new(); let view_id: Ulid = view_id.into(); // Do not allow deletion of the default view. We will likely need to get rid of this when // we change what views do. for edge_ref in working_copy.edges_directed_for_edge_weight_kind( view_id, Incoming, EdgeWeightKindDiscriminants::Use, )? { let edge_weight = edge_ref.weight(); if *edge_weight.kind() == EdgeWeightKind::new_use_default() { return Err(WorkspaceSnapshotError::DefaultViewDeletionAttempt); } } // Find all geometries used by this view for view_use_edge_ref in working_copy.edges_directed_for_edge_weight_kind( view_id, Outgoing, EdgeWeightKindDiscriminants::Use, )? { let geometry_node_id = view_use_edge_ref.target(); // Find the "thing" this Geometry is representing, so we can make sure it is also // represented by at least one Geometry in another View. let Some(represented_thing_id) = working_copy .directed_unique_neighbor_of_edge_weight_kind( geometry_node_id, Outgoing, EdgeWeightKindDiscriminants::Represents, )? else { continue; }; let Some(represented_thing_node_weight) = working_copy.node_weight(represented_thing_id) else { continue; }; if represented_thing_node_weight.kind() != NodeWeightDiscriminants::Component { // Components _MUST_ be in another View for this View to be able to be removed. // Things with DiagramObjects (currently only Views) do not have to be part of // another View for this View to be able to be removed. continue; } let mut view_member_ids = HashSet::new(); for represents_edge_ref in working_copy.edges_directed_for_edge_weight_kind( represented_thing_id, Incoming, EdgeWeightKindDiscriminants::Represents, )? { let geometry_id = represents_edge_ref.source(); let geometry_view_id = working_copy .directed_unique_neighbor_of_edge_weight_kind( geometry_id, Incoming, EdgeWeightKindDiscriminants::Use, )? .ok_or(WorkspaceSnapshotError::NoEdgesOfKindFound( geometry_id, Incoming, EdgeWeightKindDiscriminants::Use, ))?; if geometry_view_id != view_id { view_member_ids.insert(geometry_view_id); } } if view_member_ids.is_empty() { would_be_orphaned_component_ids.push(represented_thing_node_weight.id()); } } if !would_be_orphaned_component_ids.is_empty() { return Err(WorkspaceSnapshotError::ViewRemovalWouldOrphanItems( would_be_orphaned_component_ids, )); } let mut nodes_to_delete = vec![view_id]; if let Some(diagram_object_id) = working_copy.directed_unique_neighbor_of_edge_weight_kind( view_id, Outgoing, EdgeWeightKindDiscriminants::DiagramObject, )? { // Find all geometry objects for this diagram object nodes_to_delete.extend( working_copy .edges_directed_for_edge_weight_kind( diagram_object_id, Incoming, EdgeWeightKindDiscriminants::Represents, )? .map(|edge_ref| edge_ref.source()), ); } for node_id in nodes_to_delete { working_copy.remove_node(node_id)?; } Ok(()) } async fn list_for_component_id(&self, id: ComponentId) -> WorkspaceSnapshotResult<Vec<ViewId>> { if !self.node_exists(id).await { return Ok(vec![]); } let mut view_ids_set = HashSet::new(); let working_copy = self.working_copy().await; for represents_edge_ref in working_copy.edges_directed_for_edge_weight_kind( id.into(), Incoming, EdgeWeightKindDiscriminants::Represents, )? { if let Some(view_id) = working_copy.directed_unique_neighbor_of_edge_weight_kind( represents_edge_ref.source(), Incoming, EdgeWeightKindDiscriminants::Use, )? { view_ids_set.insert(view_id); } } Ok(view_ids_set.into_iter().map(Into::into).collect()) } } #[async_trait] impl ComponentExt for SplitSnapshot { async fn root_attribute_value( &self, component_id: ComponentId, ) -> ComponentResult<AttributeValueId> { self.working_copy().await.root_attribute_value(component_id) } async fn external_source_count(&self, component_id: ComponentId) -> ComponentResult<usize> { self.working_copy() .await .external_source_count(component_id) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server