Skip to main content
Glama
incoming_connections.rs5.63 kB
use std::collections::VecDeque; use dal::{ AttributePrototype, AttributeValue, Component, DalContext, Prop, attribute::prototype::argument::{ AttributePrototypeArgument, value_source::ValueSource, }, }; use si_frontend_mv_types::incoming_connections::{ Connection, IncomingConnections as IncomingConnectionsMv, ManagementConnections as ManagementConnectionsMv, }; use si_id::ComponentId; use telemetry::prelude::*; use crate::Result; #[instrument( name = "dal_materialized_views.incoming_connections", level = "debug", skip_all )] pub async fn assemble(ctx: DalContext, component_id: ComponentId) -> Result<IncomingConnectionsMv> { let ctx = &ctx; let mut connections = prop_to_prop(ctx, component_id).await?; connections.sort(); Ok(IncomingConnectionsMv { id: component_id, connections, }) } #[instrument( name = "dal_materialized_views.outgoing_mgmt_connections", level = "debug", skip_all )] pub async fn assemble_management( ctx: DalContext, component_id: ComponentId, ) -> Result<ManagementConnectionsMv> { let ctx = &ctx; let mut connections = management(ctx, component_id).await?; connections.sort(); Ok(ManagementConnectionsMv { id: component_id, connections, }) } async fn prop_to_prop(ctx: &DalContext, component_id: ComponentId) -> Result<Vec<Connection>> { let mut connections = Vec::new(); let root_attribute_value_id = Component::root_attribute_value_id(ctx, component_id).await?; let mut work_queue = VecDeque::from([root_attribute_value_id]); while let Some(attribute_value_id) = work_queue.pop_front() { work_queue .extend(AttributeValue::get_child_av_ids_in_order(ctx, attribute_value_id).await?); let attribute_prototype_id = AttributeValue::prototype_id(ctx, attribute_value_id).await?; // Find all connections for the given attribute value. We'll collect everything into an // "in progress" cache to ensure that we don't perform unnecessary setup work if no // connections are found. let ap_args = AttributePrototype::list_arguments(ctx, attribute_prototype_id).await?; let mut in_progress = Vec::with_capacity(ap_args.len()); for attribute_prototype_argument_id in ap_args { if let Some(ValueSource::ValueSubscription(subscription)) = AttributePrototypeArgument::value_source_opt(ctx, attribute_prototype_argument_id) .await? { // If we can successfully resolve the subscription, we have found a connection! // Let's push the information we need into our "in progress" cache for later. if let Some(from_attribute_value_id) = subscription.resolve(ctx).await? { let (_, from_attribute_value_path) = AttributeValue::path_from_root(ctx, from_attribute_value_id).await?; let from_component_id = AttributeValue::component_id(ctx, from_attribute_value_id).await?; let from_prop_id = AttributeValue::prop_id(ctx, from_attribute_value_id).await?; let from_prop_path = Prop::path_by_id(ctx, from_prop_id) .await? .with_replaced_sep_and_prefix("/"); in_progress.push(( from_component_id, from_attribute_value_path, from_attribute_value_id, from_prop_id, from_prop_path, )) } } } // Only perform connections population setup if we have found connections. if !in_progress.is_empty() { let prop_id = AttributeValue::prop_id(ctx, attribute_value_id).await?; let prop_path = Prop::path_by_id(ctx, prop_id) .await? .with_replaced_sep_and_prefix("/"); let (_, attribute_value_path) = AttributeValue::path_from_root(ctx, attribute_value_id).await?; connections.reserve(in_progress.len()); for ( from_component_id, from_attribute_value_path, from_attribute_value_id, from_prop_id, from_prop_path, ) in in_progress { connections.push(Connection::Prop { from_component_id, from_attribute_value_id, from_attribute_value_path, from_prop_id, from_prop_path, to_component_id: component_id, to_prop_id: prop_id, to_prop_path: prop_path.clone(), to_attribute_value_id: attribute_value_id, to_attribute_value_path: attribute_value_path.clone(), }) } } } Ok(connections) } async fn management(ctx: &DalContext, component_id: ComponentId) -> Result<Vec<Connection>> { let component_managed_by_ids = Component::get_managed_by_id(ctx, component_id).await?; let mut connections = Vec::with_capacity(component_managed_by_ids.len()); for managed_component_id in component_managed_by_ids { connections.push(Connection::Management { from_component_id: component_id, to_component_id: managed_component_id, }) } Ok(connections) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server