Skip to main content
Glama
cas.rs4.17 kB
use std::{ collections::HashMap, fmt::Display, sync::Arc, }; use serde::{ Serialize, de::DeserializeOwned, }; use si_events::{ Actor, ContentHash, Tenancy, WebEvent, }; use telemetry::prelude::*; use super::serialize; use crate::{ LayerDbError, error::LayerDbResult, event::{ LayeredEvent, LayeredEventKind, }, layer_cache::LayerCache, persister::{ PersisterClient, PersisterStatusReader, }, }; pub const DBNAME: &str = "cas"; pub const CACHE_NAME: &str = "cas"; pub const PARTITION_KEY: &str = "cas"; #[derive(Debug, Clone)] pub struct CasDb<V> where V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { pub cache: Arc<LayerCache<Arc<V>>>, persister_client: PersisterClient, } impl<V> CasDb<V> where V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { pub fn new(cache: Arc<LayerCache<Arc<V>>>, persister_client: PersisterClient) -> Self { CasDb { cache, persister_client, } } #[instrument(name = "cas.write", level = "debug", skip_all)] pub fn write( &self, value: Arc<V>, web_events: Option<Vec<WebEvent>>, tenancy: Tenancy, actor: Actor, ) -> LayerDbResult<(ContentHash, PersisterStatusReader)> { let (postcard_value, size_hint) = serialize::to_vec(&value)?; let key = ContentHash::new(&postcard_value); let cache_key: Arc<str> = key.to_string().into(); self.cache .insert(cache_key.clone(), value.clone(), size_hint); let event = LayeredEvent::new( LayeredEventKind::CasInsertion, Arc::new(DBNAME.to_string()), cache_key, Arc::new(postcard_value), Arc::new("cas".to_string()), web_events, tenancy, actor, ); let reader = self.persister_client.write_event(event)?; Ok((key, reader)) } pub async fn read(&self, key: &ContentHash) -> LayerDbResult<Option<Arc<V>>> { self.cache.get(key.to_string().into()).await } /// We often need to extract the value from the arc by cloning it (although /// this should be avoided for large values). This will do that, and also /// helpfully convert the value to the type we want to deal with pub async fn try_read_as<T>(&self, key: &ContentHash) -> LayerDbResult<Option<T>> where V: TryInto<T>, <V as TryInto<T>>::Error: Display, { Ok(match self.read(key).await? { None => None, Some(arc_v) => Some( arc_v .as_ref() .clone() .try_into() .map_err(|err| LayerDbError::ContentConversion(err.to_string()))?, ), }) } #[instrument(name = "cas.read_many", level = "debug", skip_all)] pub async fn read_many( &self, keys: &[ContentHash], ) -> LayerDbResult<HashMap<ContentHash, Arc<V>>> { self.cache.get_bulk(keys).await } #[instrument(name = "cas.try_read_many_as", level = "debug", skip_all)] pub async fn try_read_many_as<T>( &self, keys: &[ContentHash], ) -> LayerDbResult<HashMap<ContentHash, T>> where V: TryInto<T>, <V as TryInto<T>>::Error: Display, { let mut result = HashMap::new(); for (key, arc_v) in self.cache.get_bulk(keys).await? { result.insert( key, arc_v .as_ref() .clone() .try_into() .map_err(|err| LayerDbError::ContentConversion(err.to_string()))?, ); } Ok(result) } #[instrument(name = "cas.write_bytes_to_durable_storage", level = "debug", skip_all)] pub async fn write_bytes_to_durable_storage( &self, key: &ContentHash, bytes: &[u8], ) -> LayerDbResult<()> { let key = key.to_string(); self.cache.pg().insert(&key, "cas", bytes).await?; Ok(()) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server