Skip to main content
Glama
schema.rs22.6 kB
use std::{ collections::HashMap, fmt::Debug, sync::Arc, }; use leaf::{ LeafPrototype, LeafPrototypeError, }; use petgraph::Outgoing; use serde::{ Deserialize, Serialize, }; use si_events::{ ContentHash, Timestamp, audit_log::AuditLogKind, }; use si_id::{ ActionPrototypeId, LeafPrototypeId, ManagementPrototypeId, }; use si_layer_cache::LayerDbError; use telemetry::prelude::*; use thiserror::Error; use tokio::sync::TryLockError; pub use variant::{ SchemaVariant, SchemaVariantId, }; use crate::{ DalContext, FuncError, FuncId, HelperError, SchemaVariantError, TransactionsError, action::prototype::{ ActionPrototype, ActionPrototypeError, }, cached_module::{ CachedModule, CachedModuleError, }, change_set::ChangeSetError, implement_add_edge_to, layer_db_types::{ SchemaContent, SchemaContentDiscriminants, SchemaContentV1, }, management::prototype::{ ManagementPrototype, ManagementPrototypeError, }, pkg::{ ImportOptions, PkgError, import_pkg_from_pkg, }, workspace_snapshot::{ WorkspaceSnapshotError, content_address::{ ContentAddress, ContentAddressDiscriminants, }, edge_weight::{ EdgeWeight, EdgeWeightKind, EdgeWeightKindDiscriminants, }, node_weight::{ NodeWeight, NodeWeightError, category_node_weight::CategoryNodeKind, }, }, }; pub mod leaf; pub mod variant; pub const SCHEMA_VERSION: SchemaContentDiscriminants = SchemaContentDiscriminants::V1; #[remain::sorted] #[derive(Error, Debug)] pub enum SchemaError { #[error("action prototype error: {0}")] ActionPrototype(#[from] Box<ActionPrototypeError>), #[error("cached module error: {0}")] CachedModule(#[from] CachedModuleError), #[error("change set error: {0}")] ChangeSet(#[from] ChangeSetError), #[error("default schema variant not found for schema: {0}")] DefaultSchemaVariantNotFound(SchemaId), #[error("func error: {0}")] Func(#[from] FuncError), #[error("helper error: {0}")] Helper(#[from] HelperError), #[error("layer db error: {0}")] LayerDb(#[from] LayerDbError), #[error("leaf prototype error: {0}")] LeafPrototype(#[from] Box<LeafPrototypeError>), #[error("management prototype error: {0}")] ManagementPrototype(#[from] Box<ManagementPrototypeError>), #[error("No default schema variant exists for {0}")] NoDefaultSchemaVariant(SchemaId), #[error("node weight error: {0}")] NodeWeight(#[from] NodeWeightError), #[error("No schema variant with name {0}")] NoSchemaVariantWithName(String), #[error("pkg error: {0}")] Pkg(#[from] Box<PkgError>), #[error("No schema installed after successful package import for {0}")] SchemaNotInstalledAfterImport(SchemaId), #[error("schema variant error: {0}")] SchemaVariant(#[from] Box<SchemaVariantError>), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("try lock error: {0}")] TryLock(#[from] TryLockError), #[error("uninstalled schema {0} not found")] UninstalledSchemaNotFound(SchemaId), #[error("uninstalled schema {0} not found")] UninstalledSchemaNotFoundByName(String), #[error("workspace snapshot error: {0}")] WorkspaceSnapshot(#[from] WorkspaceSnapshotError), } impl From<PkgError> for SchemaError { fn from(value: PkgError) -> Self { Box::new(value).into() } } impl From<SchemaVariantError> for SchemaError { fn from(value: SchemaVariantError) -> Self { Box::new(value).into() } } pub type SchemaResult<T> = Result<T, SchemaError>; pub use si_id::SchemaId; #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct Schema { id: SchemaId, #[serde(flatten)] timestamp: Timestamp, pub name: String, pub ui_hidden: bool, pub is_builtin: bool, } impl From<Schema> for SchemaContentV1 { fn from(value: Schema) -> Self { Self { timestamp: value.timestamp, name: value.name, ui_hidden: value.ui_hidden, is_builtin: value.is_builtin, } } } impl Schema { pub fn assemble(id: SchemaId, inner: SchemaContentV1) -> Self { Self { id, timestamp: inner.timestamp, name: inner.name, ui_hidden: inner.ui_hidden, is_builtin: inner.is_builtin, } } pub fn id(&self) -> SchemaId { self.id } pub fn name(&self) -> &str { &self.name } pub fn is_builtin(&self) -> bool { self.is_builtin } pub fn ui_hidden(&self) -> bool { self.ui_hidden } implement_add_edge_to!( source_id: SchemaId, destination_id: SchemaVariantId, add_fn: add_edge_to_variant, discriminant: EdgeWeightKindDiscriminants::Use, result: SchemaResult, ); implement_add_edge_to!( source_id: SchemaId, destination_id: ActionPrototypeId, add_fn: add_edge_to_action_prototype, discriminant: EdgeWeightKindDiscriminants::ActionPrototype, result: SchemaResult, ); implement_add_edge_to!( source_id: SchemaId, destination_id: ManagementPrototypeId, add_fn: add_edge_to_management_prototype, discriminant: EdgeWeightKindDiscriminants::ManagementPrototype, result: SchemaResult, ); implement_add_edge_to!( source_id: SchemaId, destination_id: LeafPrototypeId, add_fn: add_edge_to_leaf_prototype, discriminant: EdgeWeightKindDiscriminants::LeafPrototype, result: SchemaResult, ); pub async fn new(ctx: &DalContext, name: impl Into<String>) -> SchemaResult<Self> { let id = ctx.workspace_snapshot()?.generate_ulid().await?; Self::new_with_id(ctx, id.into(), name).await } pub async fn new_with_id( ctx: &DalContext, id: SchemaId, name: impl Into<String>, ) -> SchemaResult<Self> { let content = SchemaContentV1 { timestamp: Timestamp::now(), name: name.into(), ui_hidden: false, is_builtin: false, }; let (hash, _) = ctx.layer_db().cas().write( Arc::new(SchemaContent::V1(content.clone()).into()), None, ctx.events_tenancy(), ctx.events_actor(), )?; let workspace_snapshot = ctx.workspace_snapshot()?; // Lineage id has to match id here, otherwise every new schema will be // treated as a new node, even though it has the same id let node_weight = NodeWeight::new_content(id.into(), id.into(), ContentAddress::Schema(hash)); workspace_snapshot.add_or_replace_node(node_weight).await?; let schema_category_index_id = workspace_snapshot .get_category_node_or_err(CategoryNodeKind::Schema) .await?; workspace_snapshot .add_edge( schema_category_index_id, EdgeWeight::new(EdgeWeightKind::new_use()), id, ) .await?; Ok(Self::assemble(id, content)) } pub async fn all_overlay_func_ids( ctx: &DalContext, schema_id: SchemaId, ) -> SchemaResult<Vec<FuncId>> { let mut func_ids = vec![]; let action_prototypes = ActionPrototype::for_schema(ctx, schema_id) .await .map_err(Box::new)?; for action_proto in action_prototypes { let func_id = ActionPrototype::func_id(ctx, action_proto.id()) .await .map_err(Box::new)?; func_ids.push(func_id); } let management_prototypes = ManagementPrototype::list_for_schema_id(ctx, schema_id) .await .map_err(Box::new)?; for management_proto in management_prototypes { let func_id = ManagementPrototype::func_id(ctx, management_proto.id()) .await .map_err(Box::new)?; func_ids.push(func_id); } let leaf_prototypes = LeafPrototype::for_schema(ctx, schema_id) .await .map_err(Box::new)?; for leaf_proto in leaf_prototypes { let func_id = LeafPrototype::func_id(ctx, leaf_proto.id()) .await .map_err(Box::new)?; func_ids.push(func_id); } Ok(func_ids) } pub async fn default_variant_id( ctx: &DalContext, schema_id: SchemaId, ) -> SchemaResult<SchemaVariantId> { Self::default_variant_id_opt(ctx, schema_id) .await? .ok_or(SchemaError::DefaultSchemaVariantNotFound(schema_id)) } pub async fn default_variant_id_opt( ctx: &DalContext, schema_id: SchemaId, ) -> SchemaResult<Option<SchemaVariantId>> { let workspace_snapshot = ctx.workspace_snapshot()?; let default_schema_variant_node_indicies = workspace_snapshot .edges_directed(schema_id, Outgoing) .await?; for (edge_weight, _, target_index) in default_schema_variant_node_indicies { if *edge_weight.kind() == EdgeWeightKind::new_use_default() { return Ok(Some( workspace_snapshot .get_node_weight(target_index) .await? .id() .into(), )); } } Ok(None) } /// This method returns all [`SchemaVariantIds`](SchemaVariant) that are used by the [`Schema`] /// corresponding to the [`SchemaId`](Schema) passed in. This method will also include the /// default [`SchemaVariantId`](SchemaVariantId), if one exists. pub async fn list_schema_variant_ids( ctx: &DalContext, schema_id: SchemaId, ) -> SchemaResult<Vec<SchemaVariantId>> { let workspace_snapshot = ctx.workspace_snapshot()?; let schema_variant_ids: Vec<_> = workspace_snapshot .edges_directed_for_edge_weight_kind( schema_id, Outgoing, EdgeWeightKindDiscriminants::Use, ) .await? .into_iter() .map(|(_, _, target_id)| target_id.into()) .collect(); Ok(schema_variant_ids) } pub async fn set_default_variant_id( &self, ctx: &DalContext, schema_variant_id: SchemaVariantId, ) -> SchemaResult<()> { let workspace_snapshot = ctx.workspace_snapshot()?; debug!( %schema_variant_id, "setting the default schema variant for schema: {}", self.id ); // Our system will have edges as follows: // // Schema -> Use -> Schema Variant // // In order to make a schema variant the default for a schema, we need // to update the correct edge from Use to the default variant of Use, // // Schema -> Use {is_default = true} -> Schema Variant // // Therefore, when we are setting a default schema variant we need to // find any existing default Use edges and convert them back to uses AND we // need to find the existing Use edge between our nodes and change that // to be a default Use for (edge_weight, source_index, target_index) in workspace_snapshot .edges_directed_for_edge_weight_kind( self.id, Outgoing, EdgeWeightKind::new_use_default().into(), ) .await? { // We have found the existing Default edge between schema and schema variant // we now need to update that edge to be a Use workspace_snapshot .remove_edge(source_index, target_index, edge_weight.kind().into()) .await?; Self::add_edge_to_variant( ctx, self.id, workspace_snapshot .get_node_weight(target_index) .await? .id() .into(), EdgeWeightKind::new_use(), ) .await?; } workspace_snapshot .remove_edge(self.id, schema_variant_id, EdgeWeightKind::new_use().into()) .await?; Self::add_edge_to_variant( ctx, self.id, schema_variant_id, EdgeWeightKind::new_use_default(), ) .await?; Ok(()) } /// Returns whether or not the [`Schema`] exists locally. This works because /// [`Schemas`](Schema) are unique across workspaces. pub async fn exists_locally(ctx: &DalContext, id: SchemaId) -> SchemaResult<bool> { Ok(ctx.workspace_snapshot()?.node_exists(id).await) } pub async fn get_by_id_opt(ctx: &DalContext, id: SchemaId) -> SchemaResult<Option<Self>> { let workspace_snapshot = ctx.workspace_snapshot()?; let Some(node_weight) = workspace_snapshot.get_node_weight_opt(id).await else { return Ok(None); }; let hash = node_weight.content_hash(); let content: SchemaContent = ctx .layer_db() .cas() .try_read_as(&hash) .await? .ok_or(WorkspaceSnapshotError::MissingContentFromStore(id.into()))?; // NOTE(nick,jacob,zack): if we had a v2, then there would be migration logic here. let SchemaContent::V1(inner) = content; Ok(Some(Self::assemble(id, inner))) } pub async fn get_by_id(ctx: &DalContext, id: SchemaId) -> SchemaResult<Self> { let workspace_snapshot = ctx.workspace_snapshot()?; let node_weight = workspace_snapshot.get_node_weight(id).await?; let hash = node_weight.content_hash(); let content: SchemaContent = ctx .layer_db() .cas() .try_read_as(&hash) .await? .ok_or(WorkspaceSnapshotError::MissingContentFromStore(id.into()))?; // NOTE(nick,jacob,zack): if we had a v2, then there would be migration logic here. let SchemaContent::V1(inner) = content; Ok(Self::assemble(id, inner)) } pub async fn modify<L>(self, ctx: &DalContext, lambda: L) -> SchemaResult<Self> where L: FnOnce(&mut Self) -> SchemaResult<()>, { let mut schema = self; let before = SchemaContentV1::from(schema.clone()); lambda(&mut schema)?; let updated = SchemaContentV1::from(schema.clone()); if updated != before { let (hash, _) = ctx.layer_db().cas().write( Arc::new(SchemaContent::V1(updated.clone()).into()), None, ctx.events_tenancy(), ctx.events_actor(), )?; ctx.workspace_snapshot()? .update_content(schema.id.into(), hash) .await?; } Ok(schema) } pub async fn list(ctx: &DalContext) -> SchemaResult<Vec<Self>> { let workspace_snapshot = ctx.workspace_snapshot()?; let mut schemas = vec![]; let schema_category_index_id = workspace_snapshot .get_category_node_or_err(CategoryNodeKind::Schema) .await?; let schema_node_indices = workspace_snapshot .outgoing_targets_for_edge_weight_kind( schema_category_index_id, EdgeWeightKindDiscriminants::Use, ) .await?; let mut schema_node_weights = vec![]; let mut schema_content_hashes = vec![]; for index in schema_node_indices { let node_weight = workspace_snapshot .get_node_weight(index) .await? .get_content_node_weight_of_kind(ContentAddressDiscriminants::Schema)?; schema_content_hashes.push(node_weight.content_hash()); schema_node_weights.push(node_weight); } let schema_contents: HashMap<ContentHash, SchemaContent> = ctx .layer_db() .cas() .try_read_many_as(schema_content_hashes.as_slice()) .await?; for node_weight in schema_node_weights { match schema_contents.get(&node_weight.content_hash()) { Some(content) => { // NOTE(nick,jacob,zack): if we had a v2, then there would be migration logic here. let SchemaContent::V1(inner) = content; schemas.push(Self::assemble(node_weight.id().into(), inner.to_owned())); } None => Err(WorkspaceSnapshotError::MissingContentFromStore( node_weight.id(), ))?, } } Ok(schemas) } /// Lists all [`Schemas`](Schema) by ID in the workspace. pub async fn list_ids(ctx: &DalContext) -> SchemaResult<Vec<SchemaId>> { let workspace_snapshot = ctx.workspace_snapshot()?; let schema_category_index_id = workspace_snapshot .get_category_node_or_err(CategoryNodeKind::Schema) .await?; let schema_node_indices = workspace_snapshot .outgoing_targets_for_edge_weight_kind( schema_category_index_id, EdgeWeightKindDiscriminants::Use, ) .await?; let mut schema_ids = Vec::with_capacity(schema_node_indices.len()); for index in schema_node_indices { let raw_id = workspace_snapshot.get_node_weight(index).await?.id(); schema_ids.push(raw_id.into()); } Ok(schema_ids) } // NOTE(nick): this assumes that schema names are unique. pub async fn get_by_name_opt( ctx: &DalContext, name: impl AsRef<str>, ) -> SchemaResult<Option<Self>> { let workspace_snapshot = ctx.workspace_snapshot()?; let schema_node_indices = { let schema_category_index_id = workspace_snapshot .get_category_node_or_err(CategoryNodeKind::Schema) .await?; workspace_snapshot .outgoing_targets_for_edge_weight_kind( schema_category_index_id, EdgeWeightKindDiscriminants::Use, ) .await? }; // NOTE(nick): this algorithm could be better. for schema_node_index in schema_node_indices { let schema_node_weight = { workspace_snapshot .get_node_weight(schema_node_index) .await? .get_content_node_weight_of_kind(ContentAddressDiscriminants::Schema)? }; let schema = Self::get_by_id(ctx, schema_node_weight.id().into()).await?; if schema.name.eq_ignore_ascii_case(name.as_ref()) { return Ok(Some(schema)); } } Ok(None) } pub async fn get_by_name(ctx: &DalContext, name: impl AsRef<str>) -> SchemaResult<Self> { Self::get_by_name_opt(ctx, name.as_ref()) .await? .ok_or_else(|| SchemaError::NoSchemaVariantWithName(name.as_ref().to_string())) } pub async fn get_or_install_by_name(ctx: &DalContext, name: &str) -> SchemaResult<Schema> { // If there's an installed schema, return it Ok(match Self::get_by_name_opt(ctx, name).await? { Some(schema) => schema, None => { let uninstalled_module = CachedModule::find_latest_for_schema_name(ctx, name) .await? .ok_or(SchemaError::UninstalledSchemaNotFoundByName(name.into()))?; Self::install_from_module(ctx, uninstalled_module).await? } }) } pub async fn is_name_taken(ctx: &DalContext, name: &String) -> SchemaResult<bool> { Ok(Self::list(ctx).await?.iter().any(|s| s.name.eq(name))) } /// Returns the default [`SchemaVariantId`] for the provided [`SchemaId`] /// *if* this schema is installed. If this schema is not installed, it looks /// for it in the local module cache, and if it exists there, it installs it, then /// returns the newly installed default [`SchemaVariantId`]. #[instrument( name = "schema.get_or_install_default_variant", level = "info", skip_all )] pub async fn get_or_install_default_variant( ctx: &DalContext, schema_id: SchemaId, ) -> SchemaResult<SchemaVariantId> { Self::ensure_installed(ctx, schema_id).await?; Self::default_variant_id(ctx, schema_id).await } async fn ensure_installed(ctx: &DalContext, schema_id: SchemaId) -> SchemaResult<()> { // Install the schema, if it isn't already if !Self::exists_locally(ctx, schema_id).await? { let module = CachedModule::find_latest_for_schema_id(ctx, schema_id) .await? .ok_or(SchemaError::UninstalledSchemaNotFound(schema_id))?; Self::install_from_module(ctx, module.clone()).await?; let variant_id = Self::default_variant_id(ctx, schema_id).await?; ctx.write_audit_log( AuditLogKind::InstallSchemaAndVariant { schema_id, schema_variant_id: variant_id, schema_variant_display_name: module.schema_name.clone(), }, module.schema_name, ) .await?; } Ok(()) } #[instrument(name = "schema.install_from_module", level = "info", skip_all)] async fn install_from_module( ctx: &DalContext, mut module: CachedModule, ) -> SchemaResult<Schema> { let si_pkg = module.si_pkg(ctx).await?; import_pkg_from_pkg( ctx, &si_pkg, Some(ImportOptions { schema_id: Some(module.schema_id.into()), ..Default::default() }), ) .await?; Self::get_by_id_opt(ctx, module.schema_id) .await? .ok_or(SchemaError::UninstalledSchemaNotFound(module.schema_id)) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server