Skip to main content
Glama
deployment.rs13.8 kB
use si_data_nats::{ Subject, async_nats::jetstream::kv::{ CreateErrorKind, Watch, }, }; use si_frontend_mv_types::{ definition_checksum::materialized_view_definition_checksums, index::deployment::{ DeploymentIndexPointerValueV2, DeploymentIndexPointerVersion, }, object::FrontendObject, reference::{ IndexReference, ReferenceKind, }, }; use telemetry::prelude::*; use crate::{ Domain, Error, FriggError, FriggStore, KvRevision, Result, Scope, }; impl FriggStore { #[instrument( name = "frigg.insert_deployment_object", level = "debug", skip_all, fields( si.frontend_object.id = %object.id, si.frontend_object.kind = %object.kind, ) )] pub async fn insert_deployment_object(&self, object: &FrontendObject) -> Result<Subject> { let key = Self::deployment_object_key(&object.kind, &object.id, &object.checksum); let value = serde_json::to_vec(&object).map_err(Error::Serialize)?; if let Err(err) = self.store.create(key.as_str(), value.into()).await { if !matches!(err.kind(), CreateErrorKind::AlreadyExists) { return Err(Error::Create(err)); } }; Ok(key) } #[instrument( name = "frigg.get_deployment_object", level = "debug", skip_all, fields( si.frontend_object.id = %id, si.frontend_object.kind = %kind, si.frontend_object.checksum = %checksum, ))] pub async fn get_deployment_object( &self, kind: &str, id: &str, checksum: &str, ) -> Result<Option<FrontendObject>> { match self .get_object_raw_bytes(&Self::deployment_object_key(kind, id, checksum)) .await? { Some((bytes, _)) => Ok(Some( serde_json::from_slice(bytes.as_ref()).map_err(Error::Deserialize)?, )), None => Ok(None), } } #[instrument( name = "frigg.get_current_deployment_object", level = "debug", skip_all, fields( si.frontend_object.id = %id, si.frontend_object.kind = %kind, ))] pub async fn get_current_deployment_object( &self, kind: &str, id: &str, ) -> Result<Option<FrontendObject>> { let maybe_mv_index = self.get_deployment_index().await?.map(|r| r.0); self.get_current_deployment_object_with_index(kind, id, maybe_mv_index) .await } #[instrument( name = "frigg.get_current_deployment_object_with_index", level = "debug", skip_all, fields( si.frontend_object.id = %id, si.frontend_object.kind = %kind, ) )] pub async fn get_current_deployment_object_with_index( &self, kind: &str, id: &str, maybe_mv_index: Option<FrontendObject>, ) -> Result<Option<FrontendObject>> { let Some(current_index) = maybe_mv_index else { return Ok(None); }; let mv_list = Self::mv_list_from_deployment_mv_index_version_data(current_index.data)?; for index_entry in mv_list { if index_entry.kind == kind && index_entry.id == id { return Ok(Some( self.get_deployment_object(kind, id, &index_entry.checksum) .await? .ok_or_else(|| FriggError::ObjectNotFoundForDeploymentIndex { kind: kind.to_string(), id: id.to_string(), })?, )); } } Ok(None) } pub async fn get_current_deployment_object_with_mvlist( &self, kind: &str, id: &str, mv_list: &[IndexReference], ) -> Result<Option<FrontendObject>> { for index_entry in mv_list { if index_entry.kind == kind && index_entry.id == id { return Ok(Some( self.get_deployment_object(kind, id, &index_entry.checksum) .await? .ok_or_else(|| FriggError::ObjectNotFoundForDeploymentIndex { kind: kind.to_string(), id: id.to_string(), })?, )); } } Ok(None) } #[instrument( name = "frigg.get_current_deployment_objects_by_kind", level = "debug", skip_all, fields( si.frontend_object.kind = %kind, ))] pub async fn get_current_deployment_objects_by_kind( &self, kind: &str, ) -> Result<Vec<FrontendObject>> { let Some((current_index, _)) = self.get_deployment_index().await? else { return Ok(Vec::new()); }; let mv_list = Self::mv_list_from_deployment_mv_index_version_data(current_index.data)?; let mut objects = Vec::new(); for index_entry in mv_list { if index_entry.kind == kind { if let Ok(Some(obj)) = self .get_deployment_object(kind, &index_entry.id, &index_entry.checksum) .await { objects.push(obj); } } } Ok(objects) } pub fn mv_list_from_deployment_mv_index_version_data( versioned_index_data: serde_json::Value, ) -> Result<Vec<IndexReference>> { Ok( match serde_json::from_value::< si_frontend_mv_types::index::deployment::DeploymentMvIndexVersion, >(versioned_index_data) .map_err(FriggError::Deserialize)? { si_frontend_mv_types::index::deployment::DeploymentMvIndexVersion::V1(v1_index) => { v1_index.mv_list } si_frontend_mv_types::index::deployment::DeploymentMvIndexVersion::V2(v2_index) => { v2_index.mv_list } }, ) } async fn insert_or_update_deployment_index_preamble( &self, object: &FrontendObject, ) -> Result<(Subject, Subject)> { let mv_index_kind_string = ReferenceKind::DeploymentMvIndex.to_string(); if object.kind != mv_index_kind_string { return Err(Error::NotIndexKind(object.kind.clone())); } let index_object_key = self.insert_deployment_object(object).await?; let index_pointer_key = Self::deployment_index_key(); Ok((index_object_key, index_pointer_key)) } /// Insert a new deployment `MvIndex` into the store, and update the associated index pointer /// to refer to the newly inserted `MvIndex`. /// /// Will fail if the index pointer already exists. #[instrument( name = "frigg.insert_deployment_index", level = "debug", skip_all, fields( si.frontend_object.id = %object.id, si.frontend_object.kind = %object.kind, si.frontend_object.checksum = %object.checksum, ) )] pub async fn insert_deployment_index(&self, object: &FrontendObject) -> Result<KvRevision> { let (index_object_key, index_pointer_key) = self .insert_or_update_deployment_index_preamble(object) .await?; let index_pointer_value = DeploymentIndexPointerValueV2 { index_object_key: index_object_key.into_string(), definition_checksums: materialized_view_definition_checksums().clone(), index_checksum: object.checksum.to_owned(), }; let value = serde_json::to_vec(&index_pointer_value).map_err(Error::Serialize)?; let new_revision = self.store.create(index_pointer_key, value.into()).await?; Ok(new_revision.into()) } /// Insert an updated `MvIndex` into the store, and update the associated index pointer to /// refer to the newly inserted `MvIndex`. /// /// Will fail if the index pointer has been updated since `revision` was fetched. #[instrument( name = "frigg.update_deployment_index", level = "debug", skip_all, fields( si.frontend_object.id = %object.id, si.frontend_object.kind = %object.kind, si.frontend_object.checksum = %object.checksum, ) )] pub async fn update_deployment_index( &self, object: &FrontendObject, revision: KvRevision, ) -> Result<KvRevision> { let (index_object_key, index_pointer_key) = self .insert_or_update_deployment_index_preamble(object) .await?; let index_pointer_value = DeploymentIndexPointerValueV2 { index_object_key: index_object_key.into_string(), definition_checksums: materialized_view_definition_checksums().clone(), index_checksum: object.checksum.to_owned(), }; let value = serde_json::to_vec(&index_pointer_value).map_err(Error::Serialize)?; let new_revision = self .store .update(index_pointer_key, value.into(), revision.0) .await?; Ok(new_revision.into()) } /// Put a new `MvIndex` into the store, and update the associated index pointer to refer /// to the newly inserted `MvIndex`. /// /// Will NOT fail if the index pointer already exists. #[instrument( name = "frigg.put_deployment_index", level = "debug", skip_all, fields( si.frontend_object.id = %object.id, si.frontend_object.kind = %object.kind, si.frontend_object.checksum = %object.checksum, ) )] pub async fn put_deployment_index(&self, object: &FrontendObject) -> Result<KvRevision> { let (index_object_key, index_pointer_key) = self .insert_or_update_deployment_index_preamble(object) .await?; let index_pointer_value = DeploymentIndexPointerValueV2 { index_object_key: index_object_key.into_string(), definition_checksums: materialized_view_definition_checksums().clone(), index_checksum: object.checksum.to_owned(), }; let value = serde_json::to_vec(&index_pointer_value).map_err(Error::Serialize)?; let new_revision = self.store.put(index_pointer_key, value.into()).await?; Ok(new_revision.into()) } #[instrument( name = "frigg.get_deployment_index", level = "debug", skip_all, fields() )] pub async fn get_deployment_index(&self) -> Result<Option<(FrontendObject, KvRevision)>> { let index_pointer_key = Self::deployment_index_key(); let Some((bytes, revision)) = self.get_object_raw_bytes(&index_pointer_key).await? else { return Ok(None); }; let index_pointer_value = match serde_json::from_slice::<DeploymentIndexPointerVersion>(bytes.as_ref()) .map_err(Error::Deserialize)? { DeploymentIndexPointerVersion::V1(_) => return Ok(None), DeploymentIndexPointerVersion::V2(index) => index, }; // TEMPORARY: Definition checksum validation bypassed // This allows deployment indexes to be used even when definition checksums don't match // // TODO: Re-enable this validation once the deployment-level MV definition checksum is decoupled from the change set level MV definition checksum. /* // If the definition checksum for the current set of MVs is not the same as the one the // MvIndex was built for, then the MvIndex is out of date and should not be used at all. if index_pointer_value.definition_checksums != materialized_view_definition_checksums() { debug!( "deployment index pointer is out of date: index checksums: {:?}, expected checksums: {:?}", index_pointer_value.definition_checksums, materialized_view_definition_checksums() ); return Ok(None); } */ debug!( "deployment index checksum validation bypassed (temporary): index checksums: {:?}, expected checksums: {:?}", index_pointer_value.definition_checksums, materialized_view_definition_checksums() ); let object_key = index_pointer_value.index_object_key; let bytes = self .store .get(object_key.to_string()) .await .map_err(Error::EntryGetDeploymentIndex)? .ok_or(Error::IndexObjectNotFound(object_key.into()))?; let object = serde_json::from_slice(bytes.as_ref()).map_err(Error::Deserialize)?; Ok(Some((object, revision))) } #[instrument( name = "frigg.watch_deployment_index", level = "debug", skip_all, fields() )] pub async fn watch_deployment_index(&self) -> Result<Watch> { let index_pointer_key = Self::deployment_index_key(); self.store .watch(index_pointer_key) .await .map_err(Into::into) } #[inline] fn deployment_object_key(kind: &str, id: &str, checksum: &str) -> Subject { Subject::from(format!( "{}.{}.{kind}.{id}.{checksum}", Domain::Object.as_ref(), Scope::Deployment.as_ref(), )) } #[inline] fn deployment_index_key() -> Subject { Subject::from(format!( "{}.{}", Domain::Index.as_ref(), Scope::Deployment.as_ref() )) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server