Skip to main content
Glama
split_snapshot_rebase_batch.rs5.58 kB
use std::{ sync::Arc, time::Instant, }; use serde::{ Serialize, de::DeserializeOwned, }; use si_events::{ Actor, Tenancy, WebEvent, split_snapshot_rebase_batch_address::SplitSnapshotRebaseBatchAddress, }; use telemetry::prelude::*; use super::serialize; use crate::{ error::LayerDbResult, event::{ LayeredEvent, LayeredEventKind, }, layer_cache::LayerCache, persister::{ PersisterClient, PersisterStatusReader, }, }; pub const DBNAME: &str = "split_snapshot_rebase_batches"; pub const CACHE_NAME: &str = "split_snapshot_rebase_batches"; pub const PARTITION_KEY: &str = "split_snapshot_rebase_batches"; #[derive(Debug, Clone)] pub struct SplitSnapshotRebaseBatchDb<V> where V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { pub cache: Arc<LayerCache<Arc<V>>>, persister_client: PersisterClient, } impl<V> SplitSnapshotRebaseBatchDb<V> where V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { pub fn new(cache: Arc<LayerCache<Arc<V>>>, persister_client: PersisterClient) -> Self { Self { cache, persister_client, } } #[instrument(name = "split_snapshot_rebase_batch.write", level = "debug", skip_all)] pub fn write( &self, value: Arc<V>, web_events: Option<Vec<WebEvent>>, tenancy: Tenancy, actor: Actor, ) -> LayerDbResult<(SplitSnapshotRebaseBatchAddress, PersisterStatusReader)> { let value_clone = value.clone(); let (postcard_value, size_hint) = serialize::to_vec(&value)?; let key = SplitSnapshotRebaseBatchAddress::new(&postcard_value); let cache_key: Arc<str> = key.to_string().into(); self.cache.insert(cache_key.clone(), value_clone, size_hint); let event = LayeredEvent::new( LayeredEventKind::SplitRebaseBatchWrite, Arc::new(DBNAME.to_string()), cache_key, Arc::new(postcard_value), Arc::new("split_snapshot_rebase_batches".to_string()), web_events, tenancy, actor, ); let reader = self.persister_client.write_event(event)?; Ok((key, reader)) } #[instrument( name = "split_snapshot_rebase_batch.read", level = "debug", skip_all, fields( si.split_snapshot_rebase_batch.address = %key, ) )] pub async fn read( &self, key: &SplitSnapshotRebaseBatchAddress, ) -> LayerDbResult<Option<Arc<V>>> { self.cache.get(key.to_string().into()).await } #[instrument( name = "split_snapshot_rebase_batch.read_wait_for_memory", level = "debug", skip_all, fields( si.layer_cache.memory_cache.hit = Empty, si.layer_cache.memory_cache.read_wait_ms = Empty, si.layer_cache.memory_cache.retries = Empty, si.split_snapshot_rebase_batch.address = %key, ) )] pub async fn read_wait_for_memory( &self, key: &SplitSnapshotRebaseBatchAddress, ) -> LayerDbResult<Option<Arc<V>>> { let span = current_span_for_instrument_at!("debug"); let key: Arc<str> = key.to_string().into(); const MAX_TRIES: i32 = 2000; let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(1)); let mut tried = 0; let read_wait = Instant::now(); while tried < MAX_TRIES { if let Some(v) = self.cache.cache().get_from_memory(key.clone()).await { span.record("si.layer_cache.memory_cache.hit", true); span.record( "si.layer_cache.memory_cache.read_wait_ms", read_wait.elapsed().as_millis(), ); span.record("si.layer_cache.memory_cache.retries", tried); return Ok(Some(v)); } tried += 1; interval.tick().await; } span.record("si.layer_cache.memory_cache.hit", false); self.cache.get(key.to_string().into()).await } #[instrument( name = "split_snapshot_rebase_batch.evict", level = "debug", skip_all, fields( si.split_snapshot_rebase_batch.address = %key, ) )] pub async fn evict( &self, key: &SplitSnapshotRebaseBatchAddress, tenancy: Tenancy, actor: Actor, ) -> LayerDbResult<PersisterStatusReader> { let cache_key = key.to_string(); self.cache.remove_from_memory(&cache_key); let event = LayeredEvent::new( LayeredEventKind::SplitRebaseBatchEvict, Arc::new(DBNAME.to_string()), cache_key.into(), Arc::new(Vec::new()), Arc::new("split_snapshot_rebase_batch".to_string()), None, tenancy, actor, ); let reader = self.persister_client.evict_event(event)?; Ok(reader) } #[instrument( name = "split_snapshot_rebase_batch.read_bytes_from_durable_storage", level = "debug", skip_all, fields( si.split_snapshot_rebase_batch.address = %key, ) )] pub async fn read_bytes_from_durable_storage( &self, key: &SplitSnapshotRebaseBatchAddress, ) -> LayerDbResult<Option<Vec<u8>>> { self.cache .get_bytes_from_durable_storage(key.to_string().into()) .await } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server