Skip to main content
Glama
leaf.rs15 kB
use std::{ collections::BTreeMap, sync::Arc, }; use serde_json::json; use si_id::{ AttributeValueId, FuncId, LeafPrototypeId, SchemaId, ulid::Ulid, }; use si_layer_cache::LayerDbError; use thiserror::Error; use tokio::sync::RwLock; use super::{ Schema, SchemaError, }; use crate::{ AttributeValue, DalContext, EdgeWeightKind, EdgeWeightKindDiscriminants, Func, FuncError, HelperError, WorkspaceSnapshotError, attribute::{ path::AttributePath, value::{ AttributeValueError, PrototypeExecution, write_values_to_cas, }, }, func::{ leaf::{ LeafInputLocation, LeafKind, }, runner::{ FuncRunner, FuncRunnerError, }, }, implement_add_edge_to, layer_db_types::{ AttributePathsContent, AttributePathsContentV1, ContentTypes, }, workspace_snapshot::{ content_address::{ ContentAddress, ContentAddressDiscriminants, }, node_weight::{ NodeWeight, NodeWeightError, category_node_weight::CategoryNodeKind, leaf_prototype_node_weight::LeafPrototypeNodeWeight, }, }, }; #[derive(Error, Debug)] pub enum LeafPrototypeError { #[error("attribute value error: {0}")] AttributeValue(#[from] Box<AttributeValueError>), #[error("component with root av id {0} is missing a a leaf destination at {1}")] ComponentMissingDestinationAv(AttributeValueId, String), #[error("func error: {0}")] Func(#[from] FuncError), #[error("func runner error: {0}")] FuncRunner(#[from] FuncRunnerError), #[error("Helper error: {0}")] Helper(#[from] HelperError), #[error("layer db error: {0}")] LayerDb(#[from] LayerDbError), #[error("leaf prototype {0} has no function")] LeafPrototypeHasNoFunc(LeafPrototypeId), #[error("leaf prototype has no inputs at content address: {0:?}")] LeafPrototypeHasNoInputsAtContentAddress(ContentAddress), #[error("node weight error: {0}")] NodeWeight(#[from] NodeWeightError), #[error("schema error: {0}")] Schema(#[from] SchemaError), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("workspace snapshot error: {0}")] WorkspaceSnapshotError(#[from] WorkspaceSnapshotError), } pub type LeafPrototypeResult<T> = Result<T, LeafPrototypeError>; /// Leaf prototypes are schema level ("overlay") equivalents of Attribute /// functions, with the exception that they have predefined output locations /// (defined by their "LeafKind"). Currently there are only Qualifications and /// CodeGenerations. #[derive(Debug, Clone, Eq, PartialEq)] pub struct LeafPrototype { id: LeafPrototypeId, inputs: Vec<AttributePath>, kind: LeafKind, } impl LeafPrototype { pub fn id(&self) -> LeafPrototypeId { self.id } pub fn inputs(&self) -> &[AttributePath] { &self.inputs } pub fn leaf_inputs(&self) -> impl Iterator<Item = LeafInputLocation> { self.inputs().iter().filter_map(|path| path.into()) } pub fn kind(&self) -> LeafKind { self.kind } pub async fn new( ctx: &DalContext, schema_id: SchemaId, kind: LeafKind, inputs: &[LeafInputLocation], func_id: FuncId, ) -> LeafPrototypeResult<Self> { let id = ctx.workspace_snapshot()?.generate_ulid().await?; let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?; let attribute_paths: Vec<AttributePath> = inputs.iter().copied().map(Into::into).collect(); let (content_hash, _) = ctx.layer_db().cas().write( Arc::new(crate::layer_db_types::ContentTypes::AttributePaths( attribute_paths.clone().into(), )), None, ctx.events_tenancy(), ctx.events_actor(), )?; let node_weight = NodeWeight::LeafPrototype(LeafPrototypeNodeWeight::new( id, lineage_id, kind, content_hash, )); let snap = ctx.workspace_snapshot()?; snap.add_or_replace_node(node_weight).await?; let leaf_prototype_id = id.into(); Schema::add_edge_to_leaf_prototype( ctx, schema_id, leaf_prototype_id, EdgeWeightKind::LeafPrototype, ) .await?; Self::add_edge_to_func(ctx, leaf_prototype_id, func_id, EdgeWeightKind::new_use()).await?; let overlay_category_id = ctx .workspace_snapshot()? .get_or_create_static_category_node(CategoryNodeKind::Overlays) .await?; Self::add_overlay_category_edge( ctx, overlay_category_id, leaf_prototype_id, EdgeWeightKind::new_use(), ) .await?; Ok(Self { id: leaf_prototype_id, inputs: attribute_paths, kind, }) } pub async fn get_by_id(ctx: &DalContext, id: LeafPrototypeId) -> LeafPrototypeResult<Self> { let node_weight = ctx .workspace_snapshot()? .get_node_weight(id) .await? .get_leaf_prototype_node_weight()?; let Some(ContentTypes::AttributePaths(AttributePathsContent::V1(AttributePathsContentV1( inputs, )))) = ctx .layer_db() .cas() .try_read_as(&node_weight.inputs().content_hash()) .await? else { return Err( LeafPrototypeError::LeafPrototypeHasNoInputsAtContentAddress(node_weight.inputs()), ); }; let kind = node_weight.kind(); Ok(Self { id, inputs, kind }) } pub async fn func_id( ctx: &DalContext, leaf_prototype_id: LeafPrototypeId, ) -> LeafPrototypeResult<FuncId> { let snap = ctx.workspace_snapshot()?; Ok(snap .outgoing_targets_for_edge_weight_kind( leaf_prototype_id, EdgeWeightKindDiscriminants::Use, ) .await? .pop() .ok_or(LeafPrototypeError::LeafPrototypeHasNoFunc( leaf_prototype_id, ))? .into()) } pub async fn schemas( ctx: &DalContext, leaf_prototype_id: LeafPrototypeId, ) -> LeafPrototypeResult<Vec<SchemaId>> { let snap = ctx.workspace_snapshot()?; let mut result = vec![]; for schema_id in snap .incoming_sources_for_edge_weight_kind( leaf_prototype_id, EdgeWeightKindDiscriminants::LeafPrototype, ) .await? { let NodeWeight::Content(content_inner) = snap.get_node_weight(schema_id).await? else { continue; }; if content_inner.content_address_discriminants() == ContentAddressDiscriminants::Schema { result.push(schema_id.into()); } } Ok(result) } pub async fn for_schema( ctx: &DalContext, schema_id: SchemaId, ) -> LeafPrototypeResult<Vec<LeafPrototype>> { let mut result = vec![]; for leaf_prototype_id in ctx .workspace_snapshot()? .outgoing_targets_for_edge_weight_kind( schema_id, EdgeWeightKindDiscriminants::LeafPrototype, ) .await? { let prototype = LeafPrototype::get_by_id(ctx, leaf_prototype_id.into()).await?; result.push(prototype); } Ok(result) } pub async fn attach_to_schema( &self, ctx: &DalContext, schema_id: SchemaId, ) -> LeafPrototypeResult<()> { Schema::add_edge_to_leaf_prototype( ctx, schema_id, self.id(), EdgeWeightKind::LeafPrototype, ) .await?; Ok(()) } pub async fn for_func( ctx: &DalContext, func_id: FuncId, ) -> LeafPrototypeResult<Vec<LeafPrototype>> { let snap = ctx.workspace_snapshot()?; let mut result = vec![]; for incoming_source in snap .incoming_sources_for_edge_weight_kind(func_id, EdgeWeightKindDiscriminants::Use) .await? { let weight = snap.get_node_weight(incoming_source).await?; if let NodeWeight::LeafPrototype(_) = weight { let proto = LeafPrototype::get_by_id(ctx, incoming_source.into()).await?; result.push(proto); } } Ok(result) } pub async fn resolve_inputs( &self, ctx: &DalContext, root_attribute_value_id: AttributeValueId, ) -> LeafPrototypeResult<Vec<AttributeValueId>> { let mut result = vec![]; for path in self.inputs() { if let Some(av_id) = path .resolve(ctx, root_attribute_value_id) .await .map_err(Box::new)? { result.push(av_id); } } Ok(result) } /// Pairs each input with its corresponding LeafInputLocation pub async fn resolve_inputs_with_leaf_input_locations( &self, ctx: &DalContext, root_attribute_value_id: AttributeValueId, ) -> LeafPrototypeResult<Vec<(LeafInputLocation, AttributeValueId)>> { let mut result = vec![]; for path in self.inputs() { let Some(av_id) = path .resolve(ctx, root_attribute_value_id) .await .map_err(Box::new)? else { continue; }; let Some(leaf_input) = path.into() else { continue; }; result.push((leaf_input, av_id)); } Ok(result) } pub async fn resolve_output_map( &self, ctx: &DalContext, root_attribute_value_id: AttributeValueId, ) -> LeafPrototypeResult<AttributeValueId> { let path = self.kind().map_path(); let Some(output_map_id) = path .resolve(ctx, root_attribute_value_id) .await .map_err(Box::new)? else { return Err(LeafPrototypeError::ComponentMissingDestinationAv( root_attribute_value_id, path.to_string(), )); }; Ok(output_map_id) } pub async fn resolve_output_element( &self, ctx: &DalContext, root_attribute_value_id: AttributeValueId, ) -> LeafPrototypeResult<Option<AttributeValueId>> { let output_map_id = self .resolve_output_map(ctx, root_attribute_value_id) .await?; let func = Func::get_by_id(ctx, Self::func_id(ctx, self.id).await?).await?; Ok( AttributeValue::map_child_opt(ctx, output_map_id, &func.name) .await .map_err(Box::new)?, ) } pub async fn execute( ctx: &DalContext, leaf_prototype_id: LeafPrototypeId, output_map_id: AttributeValueId, component_root_attribute_value_id: AttributeValueId, read_lock: Arc<RwLock<()>>, ) -> LeafPrototypeResult<PrototypeExecution> { let read_guard = read_lock.read().await; let prototype = Self::get_by_id(ctx, leaf_prototype_id).await?; let func_id = Self::func_id(ctx, leaf_prototype_id).await?; // We already did this in the DVU, consider stashing them somewhere let inputs = prototype .resolve_inputs_with_leaf_input_locations(ctx, component_root_attribute_value_id) .await?; let mut func_args: BTreeMap<String, serde_json::Value> = BTreeMap::new(); let mut input_attribute_value_ids = vec![]; for (leaf_input, av_id) in &inputs { let arg_name = leaf_input.arg_name(); let value = AttributeValue::view(ctx, *av_id) .await .map_err(Box::new)? .unwrap_or(serde_json::Value::Null); func_args.insert(arg_name.to_string(), value); input_attribute_value_ids.push(*av_id); } let args = serde_json::to_value(func_args)?; let result_channel = FuncRunner::run_attribute_value(ctx, output_map_id, func_id, args).await?; drop(read_guard); let mut func_run_value = result_channel .await .map_err(|_| Box::new(AttributeValueError::FuncRunnerSend))??; if func_run_value.unprocessed_value().is_some() { func_run_value.set_processed_value(Some(json!({}))); } else { func_run_value.set_processed_value(None) }; let (unprocessed_value_address, value_address) = write_values_to_cas(ctx, &func_run_value) .await .map_err(Box::new)?; FuncRunner::update_run(ctx, func_run_value.func_run_id(), |func_run| { func_run.set_success(unprocessed_value_address, value_address); }) .await?; Ok(PrototypeExecution { func_run_value, func: Func::get_by_id(ctx, func_id).await?, input_attribute_value_ids, value_id: output_map_id, }) } pub async fn remove(ctx: &DalContext, id: LeafPrototypeId) -> LeafPrototypeResult<()> { ctx.workspace_snapshot()?.remove_node_by_id(id).await?; Ok(()) } pub async fn update_inputs( ctx: &DalContext, id: LeafPrototypeId, new_inputs: &[LeafInputLocation], ) -> LeafPrototypeResult<()> { let snap = ctx.workspace_snapshot()?; let attribute_paths: Vec<AttributePath> = new_inputs.iter().copied().map(Into::into).collect(); let (content_hash, _) = ctx.layer_db().cas().write( Arc::new(crate::layer_db_types::ContentTypes::AttributePaths( attribute_paths.clone().into(), )), None, ctx.events_tenancy(), ctx.events_actor(), )?; let mut weight = snap.get_node_weight(id).await?; weight.new_content_hash(content_hash)?; snap.add_or_replace_node(weight).await?; Ok(()) } implement_add_edge_to!( source_id: LeafPrototypeId, destination_id: FuncId, add_fn: add_edge_to_func, discriminant: EdgeWeightKindDiscriminants::Use, result: LeafPrototypeResult, ); implement_add_edge_to!( source_id: Ulid, destination_id: LeafPrototypeId, add_fn: add_overlay_category_edge, discriminant: EdgeWeightKindDiscriminants::Use, result: LeafPrototypeResult, ); }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server