Skip to main content
Glama
diagram.rs28.4 kB
mod diagram_object; pub mod geometry; pub mod view; use std::{ collections::{ HashMap, HashSet, }, num::{ ParseFloatError, ParseIntError, }, }; use petgraph::prelude::*; use serde::{ Deserialize, Serialize, }; use si_data_pg::PgError; use si_frontend_types::{ DiagramComponentView, DiagramSocket, }; use si_id::{ AttributePrototypeId, AttributeValueId, ComponentId, GeometryId, SchemaId, SchemaVariantId, ViewId, }; use si_layer_cache::LayerDbError; use telemetry::prelude::*; use thiserror::Error; use crate::{ AttributePrototype, AttributeValue, ChangeSetError, Component, DalContext, EdgeWeightKind, EdgeWeightKindDiscriminants, FuncError, HelperError, NodeWeightDiscriminants, SchemaVariant, TransactionsError, Workspace, WorkspaceError, WorkspaceSnapshot, approval_requirement::ApprovalRequirementError, attribute::{ path::AttributePath, prototype::{ AttributePrototypeError, argument::{ AttributePrototypeArgument, AttributePrototypeArgumentError, AttributePrototypeArgumentId, }, }, value::AttributeValueError, }, change_status::ChangeStatus, component::ComponentError, diagram::{ geometry::{ Geometry, GeometryRepresents, }, view::{ View, ViewObjectView, }, }, schema::variant::SchemaVariantError, socket::{ input::InputSocketError, output::OutputSocketError, }, workspace_snapshot::{ WorkspaceSnapshotError, WorkspaceSnapshotSelector, node_weight::{ NodeWeight, NodeWeightError, category_node_weight::CategoryNodeKind, }, selector::WorkspaceSnapshotSelectorDiscriminants, split_snapshot::SplitSnapshot, }, }; #[remain::sorted] #[derive(Error, Debug)] pub enum DiagramError { #[error("approval requirement error: {0}")] ApprovalRequirement(#[from] Box<ApprovalRequirementError>), #[error("attribute prototype error: {0}")] AttributePrototype(#[from] Box<AttributePrototypeError>), #[error("attribute prototype argument error: {0}")] AttributePrototypeArgument(#[from] Box<AttributePrototypeArgumentError>), #[error("attribute prototype not found")] AttributePrototypeNotFound, #[error("attribute value error: {0}")] AttributeValue(#[from] Box<AttributeValueError>), #[error("attribute value not found")] AttributeValueNotFound, #[error("Change Set error: {0}")] ChangeSet(#[from] Box<ChangeSetError>), #[error("component error: {0}")] Component(#[from] Box<ComponentError>), #[error("component not found")] ComponentNotFound, #[error("component status not found for component: {0}")] ComponentStatusNotFound(ComponentId), #[error("default view not found")] DefaultViewNotFound, #[error("trying to delete only geometry (the one on view {0}) for component {1}")] DeletingLastGeometryForComponent(ViewId, ComponentId), #[error("deletion timestamp not found")] DeletionTimeStamp, #[error( "destination attribute prototype not found for inter component attribute prototype argument: {0}" )] DestinationAttributePrototypeNotFound(AttributePrototypeArgumentId), #[error( "destination input socket not found for attribute prototype ({0}) and inter component attribute prototype argument ({1})" )] DestinationInputSocketNotFound(AttributePrototypeId, AttributePrototypeArgumentId), #[error("more then one diagram object found for view {0}")] DiagramObjectMoreThanOneForView(ViewId), #[error("diagram object not found for view {0}")] DiagramObjectNotFoundForView(ViewId), #[error("edge not found")] EdgeNotFound, #[error("func error: {0}")] Func(#[from] Box<FuncError>), #[error("geometry can't represent: {0}")] GeometryCannotRepresentNodeWeight(NodeWeightDiscriminants), #[error("geometry not found: {0}")] GeometryNotFound(GeometryId), #[error("geometry not found for component {0} on view {1}")] GeometryNotFoundForComponentAndView(ComponentId, ViewId), #[error("geometry not found for view object {0} on view {1}")] GeometryNotFoundForViewObjectAndView(ViewId, ViewId), #[error("Helper error: {0}")] Helper(#[from] Box<HelperError>), #[error("input socket error: {0}")] InputSocket(#[from] Box<InputSocketError>), #[error("layerdb error: {0}")] LayerDb(#[from] LayerDbError), #[error("node not found")] NodeNotFound, #[error("node weight error: {0}")] NodeWeight(#[from] Box<NodeWeightError>), #[error("output socket error: {0}")] OutputSocket(#[from] Box<OutputSocketError>), #[error("parse float error: {0}")] ParseFloat(#[from] ParseFloatError), #[error("parse int error: {0}")] ParseInt(#[from] ParseIntError), #[error("pg error: {0}")] Pg(#[from] PgError), #[error("position not found")] PositionNotFound, #[error("schema not found")] SchemaNotFound, #[error("schema variant error: {0}")] SchemaVariant(#[from] Box<SchemaVariantError>), #[error("schema variant not found")] SchemaVariantNotFound, #[error("serde error: {0}")] Serde(#[from] serde_json::Error), #[error("si db error: {0}")] SiDb(#[from] si_db::Error), #[error("socket not found")] SocketNotFound, #[error("Transactions error: {0}")] Transactions(#[from] Box<TransactionsError>), #[error("could not acquire lock: {0}")] TryLock(#[from] tokio::sync::TryLockError), #[error("view category node not found")] ViewCategoryNotFound, #[error("view not found: {0}")] ViewNotFound(ViewId), #[error("view not found for geometry id: {0}")] ViewNotFoundForGeometry(GeometryId), #[error("Workspace error: {0}")] Workspace(#[from] Box<WorkspaceError>), #[error("workspace snapshot error: {0}")] WorkspaceSnapshot(#[from] Box<WorkspaceSnapshotError>), } impl From<ApprovalRequirementError> for DiagramError { fn from(value: ApprovalRequirementError) -> Self { Box::new(value).into() } } impl From<AttributePrototypeError> for DiagramError { fn from(value: AttributePrototypeError) -> Self { Box::new(value).into() } } impl From<AttributePrototypeArgumentError> for DiagramError { fn from(value: AttributePrototypeArgumentError) -> Self { Box::new(value).into() } } impl From<AttributeValueError> for DiagramError { fn from(value: AttributeValueError) -> Self { Box::new(value).into() } } impl From<ChangeSetError> for DiagramError { fn from(value: ChangeSetError) -> Self { Box::new(value).into() } } impl From<ComponentError> for DiagramError { fn from(value: ComponentError) -> Self { Box::new(value).into() } } impl From<FuncError> for DiagramError { fn from(value: FuncError) -> Self { Box::new(value).into() } } impl From<HelperError> for DiagramError { fn from(value: HelperError) -> Self { Box::new(value).into() } } impl From<InputSocketError> for DiagramError { fn from(value: InputSocketError) -> Self { Box::new(value).into() } } impl From<NodeWeightError> for DiagramError { fn from(value: NodeWeightError) -> Self { Box::new(value).into() } } impl From<OutputSocketError> for DiagramError { fn from(value: OutputSocketError) -> Self { Box::new(value).into() } } impl From<SchemaVariantError> for DiagramError { fn from(value: SchemaVariantError) -> Self { Box::new(value).into() } } impl From<TransactionsError> for DiagramError { fn from(value: TransactionsError) -> Self { Box::new(value).into() } } impl From<WorkspaceError> for DiagramError { fn from(value: WorkspaceError) -> Self { Box::new(value).into() } } impl From<WorkspaceSnapshotError> for DiagramError { fn from(value: WorkspaceSnapshotError) -> Self { Box::new(value).into() } } pub type DiagramResult<T> = Result<T, DiagramError>; #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all(serialize = "camelCase"))] pub struct SummaryDiagramManagementEdge { pub from_socket_id: String, pub to_socket_id: String, pub from_component_id: ComponentId, pub to_component_id: ComponentId, pub to_delete: bool, pub change_status: ChangeStatus, pub from_base_change_set: bool, } impl SummaryDiagramManagementEdge { pub fn new( from_schema_id: SchemaId, to_schema_id: SchemaId, from_component_id: ComponentId, to_component_id: ComponentId, ) -> Self { SummaryDiagramManagementEdge { from_socket_id: Self::output_socket_id(from_schema_id), to_socket_id: Self::input_socket_id(to_schema_id), from_component_id, to_component_id, to_delete: false, change_status: ChangeStatus::Added, from_base_change_set: false, } } pub fn new_removed( from_schema_id: SchemaId, to_schema_id: SchemaId, from_component_id: ComponentId, to_component_id: ComponentId, from_base_change_set: bool, ) -> Self { SummaryDiagramManagementEdge { from_socket_id: Self::output_socket_id(from_schema_id), to_socket_id: Self::input_socket_id(to_schema_id), from_component_id, to_component_id, to_delete: true, change_status: ChangeStatus::Deleted, from_base_change_set, } } pub fn output_socket_id(schema_id: SchemaId) -> String { format!("mgmt_output_{schema_id}") } pub fn input_socket_id(schema_id: SchemaId) -> String { format!("mgmt_input_{schema_id}") } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all(serialize = "camelCase"))] pub struct SummaryDiagramAttributeSubscriptionEdge { pub from_component_id: ComponentId, pub from_attribute_path: String, pub to_component_id: ComponentId, pub to_attribute_value_id: AttributeValueId, pub to_attribute_path: String, // this is inferred by if either the to or from component is marked to_delete pub to_delete: bool, pub change_status: ChangeStatus, } struct ComponentInfo { component: Component, geometry: Option<Geometry>, schema_id: SchemaId, } type ComponentInfoCache = HashMap<ComponentId, ComponentInfo>; #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct Diagram { pub components: Vec<DiagramComponentView>, pub management_edges: Vec<SummaryDiagramManagementEdge>, pub attribute_subscription_edges: Vec<SummaryDiagramAttributeSubscriptionEdge>, pub views: Vec<ViewObjectView>, } pub struct DiagramComponentViews { pub component_views: Vec<DiagramComponentView>, pub management_edges: Vec<SummaryDiagramManagementEdge>, pub attribute_subscription_edges: Vec<SummaryDiagramAttributeSubscriptionEdge>, } impl Diagram { #[instrument(level = "info", skip_all)] async fn assemble_component_views( ctx: &DalContext, base_snapshot: &WorkspaceSnapshotSelector, components: &ComponentInfoCache, diagram_sockets: &mut HashMap<SchemaVariantId, Vec<DiagramSocket>>, ) -> DiagramResult<DiagramComponentViews> { let mut component_views = Vec::with_capacity(components.len()); let mut management_edges = Vec::with_capacity(components.len() / 2); let mut attribute_subscription_edges = Vec::with_capacity(components.len()); for ComponentInfo { component, geometry, schema_id, .. } in components.values() { let change_status = component.change_status(ctx).await?; component_views.push( component .into_frontend_type(ctx, geometry.as_ref(), change_status, diagram_sockets) .await?, ); let managers = component.managers(ctx).await?; for manager_id in managers { let Some(ComponentInfo { component: from_component, schema_id: from_schema_id, .. }) = components.get(&manager_id) else { continue; }; let change_status = if base_snapshot .find_edge( manager_id, component.id(), EdgeWeightKindDiscriminants::Manages, ) .await .is_none() { ChangeStatus::Added } else { ChangeStatus::Unmodified }; let mut management_edge = SummaryDiagramManagementEdge::new( *from_schema_id, *schema_id, manager_id, component.id(), ); management_edge.to_delete = from_component.to_delete() || component.to_delete(); management_edge.change_status = change_status; management_edges.push(management_edge); } let from_root_av_id = Component::root_attribute_value_id(ctx, component.id()).await?; let base_subscribers = Self::get_subscribers(base_snapshot, from_root_av_id).await; for subscriber in Self::get_subscribers(&ctx.workspace_snapshot()?, from_root_av_id).await { let (_, to_apa_id) = subscriber; let to_ap_id = AttributePrototypeArgument::prototype_id(ctx, to_apa_id).await?; if let Some(to_attribute_value_id) = AttributePrototype::attribute_value_id(ctx, to_ap_id).await? { let (root_id, to_attribute_path) = AttributeValue::path_from_root(ctx, to_attribute_value_id).await?; let to_component_id = AttributeValue::component_id(ctx, root_id).await?; if let Some(ComponentInfo { component: to_component, .. }) = components.get(&to_component_id) { let change_status = if base_subscribers.contains(&subscriber) { ChangeStatus::Unmodified } else { ChangeStatus::Added }; attribute_subscription_edges.push( SummaryDiagramAttributeSubscriptionEdge { from_component_id: component.id(), from_attribute_path: subscriber.0, to_component_id, to_attribute_value_id, to_attribute_path, change_status, to_delete: component.to_delete() || to_component.to_delete(), }, ); } } } } Ok(DiagramComponentViews { component_views, management_edges, attribute_subscription_edges, }) } async fn get_subscribers( snapshot: &WorkspaceSnapshotSelector, subscribed_to_av_id: AttributeValueId, ) -> HashSet<(String, AttributePrototypeArgumentId)> { let Ok(edges) = snapshot .edges_directed(subscribed_to_av_id, Direction::Incoming) .await else { return HashSet::new(); }; edges .into_iter() .filter_map(|(edge, source_ulid, _)| match edge.kind { EdgeWeightKind::ValueSubscription(path) => match path { AttributePath::JsonPointer(path) => Some((path, source_ulid.into())), }, _ => None, }) .collect() } #[instrument(level = "info", skip_all)] async fn get_base_snapshot( ctx: &DalContext, ) -> DiagramResult<(WorkspaceSnapshotSelector, bool)> { let base_change_set_id = if let Some(change_set_id) = ctx.change_set()?.base_change_set_id { change_set_id } else { return Ok((ctx.workspace_snapshot()?.clone(), false)); }; let workspace = Workspace::get_by_pk( ctx, ctx.tenancy() .workspace_pk_opt() .ok_or(WorkspaceSnapshotError::WorkspaceMissing)?, ) .await?; if workspace.default_change_set_id() == ctx.change_set_id() { return Ok((ctx.workspace_snapshot()?.clone(), false)); } match workspace.snapshot_kind() { WorkspaceSnapshotSelectorDiscriminants::SplitSnapshot => { let split_snapshot = SplitSnapshot::find_for_change_set(ctx, base_change_set_id).await?; Ok(( WorkspaceSnapshotSelector::SplitSnapshot(split_snapshot.into()), true, )) } WorkspaceSnapshotSelectorDiscriminants::LegacySnapshot => { let legacy_snapshot = WorkspaceSnapshot::find_for_change_set(ctx, base_change_set_id).await?; Ok(( WorkspaceSnapshotSelector::LegacySnapshot(legacy_snapshot.into()), true, )) } } } #[instrument(level = "info", skip_all)] async fn assemble_removed_components( ctx: &DalContext, base_snapshot: WorkspaceSnapshotSelector, maybe_view_id: Option<ViewId>, diagram_sockets: &mut HashMap<SchemaVariantId, Vec<DiagramSocket>>, ) -> DiagramResult<Vec<DiagramComponentView>> { let mut removed_component_summaries = vec![]; let components = Component::list_ids(ctx).await?; let base_change_set_ctx = ctx.clone_with_base().await?; let base_change_set_ctx = &base_change_set_ctx; if let Some(components_cat_id) = base_snapshot .get_category_node(CategoryNodeKind::Component) .await? { for component_id in base_snapshot .all_outgoing_targets(components_cat_id) .await? .iter() .map(|weight| weight.id()) { let component_id: ComponentId = component_id.into(); if !components.contains(&component_id) { let deleted_component = Component::get_by_id(base_change_set_ctx, component_id).await?; // If we get a view, try to get geometry, skip whole component if we don't find it // If we don't get a view, don't skip, geometry is None let maybe_geometry = if let Some(view_id) = maybe_view_id { let Some(geometry) = Geometry::try_get_by_component_and_view( base_change_set_ctx, component_id, view_id, ) .await? else { continue; }; Some(geometry) } else { None }; let mut summary_diagram_component = deleted_component .into_frontend_type( base_change_set_ctx, maybe_geometry.as_ref(), ChangeStatus::Deleted, diagram_sockets, ) .await?; summary_diagram_component.from_base_change_set = true; removed_component_summaries.push(summary_diagram_component); } } } Ok(removed_component_summaries) } /// If a manages edge is in the base snapshot, but not in the current /// snapshot, that means it has been deleted. If one of the components is /// not in this changeset, we can ignore the deleted edge, since we won't /// render it. If the components are restored from the base, the edge will /// *magically* reappear as deleted. #[instrument(level = "info", skip_all)] async fn assemble_removed_management_edges( ctx: &DalContext, base_snapshot: &WorkspaceSnapshotSelector, ) -> DiagramResult<Vec<SummaryDiagramManagementEdge>> { let mut removed_edges = vec![]; let workspace_snapshot = ctx.workspace_snapshot()?; let mut existing_management_edges = HashSet::new(); for component in Component::list(ctx).await? { if component.to_delete() { continue; } for source_id in workspace_snapshot .incoming_sources_for_edge_weight_kind( component.id(), EdgeWeightKindDiscriminants::Manages, ) .await? { let node_weight = workspace_snapshot.get_node_weight(source_id).await?; if let NodeWeight::Component(_) = &node_weight { existing_management_edges.insert((node_weight.id().into(), component.id())); } } } // list components let head_ctx = ctx.clone_with_head().await?; for from_id in Component::list_ids(&head_ctx).await? { let from_sv = Component::schema_variant_for_component_id(&head_ctx, from_id).await?; let from_schema_id = SchemaVariant::schema_id(&head_ctx, from_sv.id()).await?; for (_, _, to_id) in base_snapshot .edges_directed_for_edge_weight_kind( from_id, Direction::Outgoing, EdgeWeightKindDiscriminants::Manages, ) .await? { let to_sv = Component::schema_variant_for_component_id(&head_ctx, to_id.into()).await?; let to_schema_id = SchemaVariant::schema_id(&head_ctx, to_sv.id()).await?; if existing_management_edges.contains(&(from_id, to_id.into())) { continue; } removed_edges.push(SummaryDiagramManagementEdge::new_removed( from_schema_id, to_schema_id, from_id, to_id.into(), true, )); } } Ok(removed_edges) } /// If a subscription is in the base snapshot, but not in the current /// snapshot, that means it has been deleted. If one of the components is /// not in this changeset, we can ignore the deleted edge, since we won't /// render it. If the components are restored from the base, the edge will /// *magically* reappear as deleted. #[instrument(level = "info", skip_all)] async fn assemble_removed_attribute_subscription_edges( _ctx: &DalContext, _base_snapshot: &WorkspaceSnapshotSelector, ) -> DiagramResult<Vec<SummaryDiagramAttributeSubscriptionEdge>> { // TODO implement this Ok(vec![]) } /// Assemble a [`Diagram`](Self) based on existing [`Nodes`](crate::Node) and /// [`Connections`](crate::Connection). /// If passed a [ViewId], assemble it for that view only, otherwise, do it for the whole /// graph. #[instrument(level = "info", skip(ctx))] pub async fn assemble(ctx: &DalContext, maybe_view_id: Option<ViewId>) -> DiagramResult<Self> { let mut views = vec![]; let component_info_cache = { let mut map = HashMap::new(); if let Some(view_id) = maybe_view_id { for geometry in Geometry::list_by_view_id(ctx, view_id).await? { let geo_represents = match Geometry::represented_id(ctx, geometry.id()).await? { Some(r) => r, None => continue, }; match geo_represents { GeometryRepresents::Component(component_id) => { let component = Component::get_by_id(ctx, component_id).await?; let schema_id = component.schema(ctx).await?.id(); map.insert( component_id, ComponentInfo { component, geometry: Some(geometry), schema_id, }, ); } GeometryRepresents::View(view_id) => { let view = View::get_by_id(ctx, view_id).await?; let view_object_view = ViewObjectView::from_view_and_geometry( ctx, view, geometry.into_raw(), ) .await?; views.push(view_object_view); } } } } else { for component in Component::list(ctx).await? { let schema_id = component.schema(ctx).await?.id(); map.insert( component.id(), ComponentInfo { component, geometry: None, schema_id, }, ); } } map }; let (base_snapshot, not_on_head) = Self::get_base_snapshot(ctx).await?; let mut diagram_sockets = HashMap::new(); let mut diagram_component_views = Self::assemble_component_views( ctx, &base_snapshot, &component_info_cache, &mut diagram_sockets, ) .await?; if not_on_head { let removed_component_summaries = Self::assemble_removed_components( ctx, base_snapshot.clone(), maybe_view_id, &mut diagram_sockets, ) .await?; diagram_component_views .component_views .extend(removed_component_summaries); let removed_management_edges = Self::assemble_removed_management_edges(ctx, &base_snapshot).await?; diagram_component_views .management_edges .extend(removed_management_edges); let removed_attribute_subscription_edges = Self::assemble_removed_attribute_subscription_edges(ctx, &base_snapshot).await?; diagram_component_views .attribute_subscription_edges .extend(removed_attribute_subscription_edges); } Ok(Self { components: diagram_component_views.component_views, management_edges: diagram_component_views.management_edges, attribute_subscription_edges: diagram_component_views.attribute_subscription_edges, views, }) } /// Calls [Self::assemble](Self::assemble) for the default view. pub async fn assemble_for_default_view(ctx: &DalContext) -> DiagramResult<Self> { let default_view_id = View::get_id_for_default(ctx).await?; Self::assemble(ctx, Some(default_view_id)).await } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server