Skip to main content
Glama
cache_updates.rs14.4 kB
use std::sync::Arc; use serde::{ Serialize, de::DeserializeOwned, }; use si_data_nats::NatsClient; use si_events::{ FuncRun, FuncRunLog, change_batch::ChangeBatch, }; use telemetry::prelude::*; use tokio::sync::mpsc::UnboundedReceiver; use tokio_util::{ sync::CancellationToken, task::TaskTracker, }; use ulid::Ulid; use crate::{ error::LayerDbResult, event::{ LayeredEvent, LayeredEventServer, }, layer_cache::LayerCache, }; pub struct CacheUpdatesTask< CasValue, EncryptedSecretValue, WorkspaceSnapshotValue, RebaseBatchValue, SplitSubgraphValue, SplitSupergraphValue, SplitRebaseBatchValue, > where CasValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, EncryptedSecretValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, WorkspaceSnapshotValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, RebaseBatchValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, SplitSubgraphValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, SplitSupergraphValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, SplitRebaseBatchValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { cas_cache: Arc<LayerCache<Arc<CasValue>>>, change_batch_cache: Arc<LayerCache<Arc<ChangeBatch>>>, encrypted_secret_cache: Arc<LayerCache<Arc<EncryptedSecretValue>>>, func_run_cache: Arc<LayerCache<Arc<FuncRun>>>, func_run_log_cache: Arc<LayerCache<Arc<FuncRunLog>>>, rebase_batch_cache: Arc<LayerCache<Arc<RebaseBatchValue>>>, snapshot_cache: Arc<LayerCache<Arc<WorkspaceSnapshotValue>>>, split_subgraph_cache: Arc<LayerCache<Arc<SplitSubgraphValue>>>, split_supergraph_cache: Arc<LayerCache<Arc<SplitSupergraphValue>>>, split_rebase_batch_cache: Arc<LayerCache<Arc<SplitRebaseBatchValue>>>, event_channel: UnboundedReceiver<LayeredEvent>, shutdown_token: CancellationToken, tracker: TaskTracker, } impl< CasValue, EncryptedSecretValue, WorkspaceSnapshotValue, RebaseBatchValue, SplitSubgraphValue, SplitSupergraphValue, SplitRebaseBatchValue, > CacheUpdatesTask< CasValue, EncryptedSecretValue, WorkspaceSnapshotValue, RebaseBatchValue, SplitSubgraphValue, SplitSupergraphValue, SplitRebaseBatchValue, > where CasValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, EncryptedSecretValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, WorkspaceSnapshotValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, RebaseBatchValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, SplitSubgraphValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, SplitSupergraphValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, SplitRebaseBatchValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { const NAME: &'static str = "LayerDB::CacheUpdatesTask"; #[allow(clippy::too_many_arguments)] pub async fn create( instance_id: Ulid, nats_client: &NatsClient, cas_cache: Arc<LayerCache<Arc<CasValue>>>, change_batch_cache: Arc<LayerCache<Arc<ChangeBatch>>>, encrypted_secret_cache: Arc<LayerCache<Arc<EncryptedSecretValue>>>, func_run_cache: Arc<LayerCache<Arc<FuncRun>>>, func_run_log_cache: Arc<LayerCache<Arc<FuncRunLog>>>, rebase_batch_cache: Arc<LayerCache<Arc<RebaseBatchValue>>>, snapshot_cache: Arc<LayerCache<Arc<WorkspaceSnapshotValue>>>, split_subgraph_cache: Arc<LayerCache<Arc<SplitSubgraphValue>>>, split_supergraph_cache: Arc<LayerCache<Arc<SplitSupergraphValue>>>, split_snapshot_rebase_batch_cache: Arc<LayerCache<Arc<SplitRebaseBatchValue>>>, shutdown_token: CancellationToken, ) -> LayerDbResult<Self> { let tracker = TaskTracker::new(); let (mut layered_event_server, event_channel) = LayeredEventServer::create(instance_id, nats_client.clone(), shutdown_token.clone()); tracker.spawn(async move { layered_event_server.run().await }); Ok(Self { cas_cache, change_batch_cache, encrypted_secret_cache, func_run_cache, func_run_log_cache, rebase_batch_cache, snapshot_cache, split_subgraph_cache, split_supergraph_cache, split_rebase_batch_cache: split_snapshot_rebase_batch_cache, event_channel, shutdown_token, tracker, }) } pub async fn run(mut self) { let shutdown_token = self.shutdown_token.clone(); tokio::select! { _ = self.process_messages() => { } _ = shutdown_token.cancelled() => { debug!(task = Self::NAME, "received cancellation"); } } self.tracker.close(); self.tracker.wait().await; debug!(task = Self::NAME, "shutdown complete"); } pub async fn process_messages(&mut self) { while let Some(event) = self.event_channel.recv().await { let cache_update_task = CacheUpdateTask::new( self.cas_cache.clone(), self.change_batch_cache.clone(), self.encrypted_secret_cache.clone(), self.func_run_cache.clone(), self.func_run_log_cache.clone(), self.snapshot_cache.clone(), self.rebase_batch_cache.clone(), self.split_subgraph_cache.clone(), self.split_supergraph_cache.clone(), self.split_rebase_batch_cache.clone(), ); self.tracker .spawn(async move { cache_update_task.run(event).await }); } } } struct CacheUpdateTask<Q, R, S, T, U, V, W> where Q: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, R: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, S: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, U: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, W: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { cas_cache: Arc<LayerCache<Arc<Q>>>, change_batch_cache: Arc<LayerCache<Arc<ChangeBatch>>>, encrypted_secret_cache: Arc<LayerCache<Arc<R>>>, func_run_cache: Arc<LayerCache<Arc<FuncRun>>>, func_run_log_cache: Arc<LayerCache<Arc<FuncRunLog>>>, snapshot_cache: Arc<LayerCache<Arc<S>>>, rebase_batch_cache: Arc<LayerCache<Arc<T>>>, split_subgraph_cache: Arc<LayerCache<Arc<U>>>, split_supergraph_cache: Arc<LayerCache<Arc<V>>>, split_rebase_batch_cache: Arc<LayerCache<Arc<W>>>, } impl<Q, R, S, T, U, V, W> CacheUpdateTask<Q, R, S, T, U, V, W> where Q: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, R: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, S: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, U: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, W: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { #[allow(clippy::too_many_arguments)] fn new( cas_cache: Arc<LayerCache<Arc<Q>>>, change_batch_cache: Arc<LayerCache<Arc<ChangeBatch>>>, encrypted_secret_cache: Arc<LayerCache<Arc<R>>>, func_run_cache: Arc<LayerCache<Arc<FuncRun>>>, func_run_log_cache: Arc<LayerCache<Arc<FuncRunLog>>>, snapshot_cache: Arc<LayerCache<Arc<S>>>, rebase_batch_cache: Arc<LayerCache<Arc<T>>>, split_subgraph_cache: Arc<LayerCache<Arc<U>>>, split_supergraph_cache: Arc<LayerCache<Arc<V>>>, split_rebase_batch_cache: Arc<LayerCache<Arc<W>>>, ) -> CacheUpdateTask<Q, R, S, T, U, V, W> { CacheUpdateTask { cas_cache, change_batch_cache, encrypted_secret_cache, func_run_cache, func_run_log_cache, snapshot_cache, rebase_batch_cache, split_subgraph_cache, split_supergraph_cache, split_rebase_batch_cache, } } async fn process_message(&self, event: LayeredEvent) -> LayerDbResult<()> { match event.event_kind { crate::event::LayeredEventKind::CasInsertion => { if !self.cas_cache.contains(&event.key) { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.cas_cache .insert_from_cache_updates(event.key, serialized_value); } } crate::event::LayeredEventKind::ChangeBatchWrite => { if !self.change_batch_cache.contains(&event.key) { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.change_batch_cache .insert_from_cache_updates(event.key, serialized_value); } } crate::event::LayeredEventKind::ChangeBatchEvict => { self.change_batch_cache.evict_from_cache_updates(event.key); } crate::event::LayeredEventKind::EncryptedSecretInsertion => { if !self.encrypted_secret_cache.contains(&event.key) { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.encrypted_secret_cache .insert_from_cache_updates(event.key, serialized_value); } } crate::event::LayeredEventKind::FuncRunWrite => { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.func_run_cache .insert_or_update_from_cache_updates(event.key, serialized_value); } crate::event::LayeredEventKind::FuncRunLogWrite => { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.func_run_log_cache .insert_or_update_from_cache_updates(event.key, serialized_value); } crate::event::LayeredEventKind::Raw => { warn!("Recevied a 'raw' layered event kind - this is for testing only. Bug!"); } crate::event::LayeredEventKind::RebaseBatchWrite => { if !self.rebase_batch_cache.contains(&event.key) { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.rebase_batch_cache .insert_from_cache_updates(event.key, serialized_value); } } crate::event::LayeredEventKind::RebaseBatchEvict => { self.rebase_batch_cache.evict_from_cache_updates(event.key); } crate::event::LayeredEventKind::SnapshotWrite => { if !self.snapshot_cache.contains(&event.key) { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.snapshot_cache .insert_from_cache_updates(event.key, serialized_value); } } crate::event::LayeredEventKind::SnapshotEvict => { self.snapshot_cache.evict_from_cache_updates(event.key); } crate::event::LayeredEventKind::SplitRebaseBatchEvict => { self.split_rebase_batch_cache .evict_from_cache_updates(event.key); } crate::event::LayeredEventKind::SplitRebaseBatchWrite => { if !self.split_rebase_batch_cache.contains(&event.key) { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.split_rebase_batch_cache .insert_from_cache_updates(event.key, serialized_value); } } crate::event::LayeredEventKind::SplitSnapshotSubGraphEvict => { self.split_subgraph_cache .evict_from_cache_updates(event.key); } crate::event::LayeredEventKind::SplitSnapshotSubGraphWrite => { if !self.split_subgraph_cache.contains(&event.key) { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.split_subgraph_cache .insert_from_cache_updates(event.key, serialized_value); } } crate::event::LayeredEventKind::SplitSnapshotSuperGraphEvict => { self.split_supergraph_cache .evict_from_cache_updates(event.key); } crate::event::LayeredEventKind::SplitSnapshotSuperGraphWrite => { if !self.split_supergraph_cache.contains(&event.key) { let serialized_value = Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone()); self.split_supergraph_cache .insert_from_cache_updates(event.key, serialized_value); } } } Ok(()) } async fn run(&self, event: LayeredEvent) { match self.process_message(event).await { Ok(()) => {} Err(e) => { error!(error = %e, "error processing layerdb cache update message"); } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server