Skip to main content
Glama
hybrid_cache.rs13.1 kB
use std::{ cmp::max, path::{ Path, PathBuf, }, sync::{ Arc, LazyLock, }, }; use foyer::{ DirectFsDeviceOptions, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions, RateLimitPicker, RecoverMode, TokioRuntimeOptions, }; use mixtrics::registry::opentelemetry_0_26::OpenTelemetryMetricsRegistry; use serde::{ Deserialize, Serialize, de::DeserializeOwned, }; use telemetry::{ opentelemetry::global, tracing::{ error, info, }, }; use tokio::fs; use crate::{ LayerDbError, db::serialize, error::LayerDbResult, }; const FOYER_DISK_CACHE_MINUMUM: u64 = 1024 * 1024 * 1024; // 1gb const DEFAULT_MEMORY_RESERVED_PERCENT: u8 = 40; const DEFAULT_MEMORY_USABLE_MAX_PERCENT: u8 = 100; const DEFAULT_DISK_RESERVED_PERCENT: u8 = 5; const DEFAULT_DISK_USAGE_MAX_PERCENT: u8 = 100; const DEFAULT_DISK_CACHE_RATE_LIMIT: usize = 1024 * 1024 * 1024; const DEFAULT_DISK_BUFFER_SIZE: usize = 1024 * 1024 * 128; // 128mb const DEFAULT_DISK_BUFFER_FLUSHERS: usize = 2; const DEFAULT_DISK_INDEXER_SHARDS: usize = 64; const DEFAULT_DISK_RECLAIMERS: usize = 2; const DEFAULT_DISK_RECOVER_CONCURRENCY: usize = 8; static TOTAL_SYSTEM_MEMORY_BYTES: LazyLock<u64> = LazyLock::new(|| { let sys = sysinfo::System::new_all(); sys.total_memory() }); #[derive(Clone, Debug, Deserialize, Serialize)] enum MaybeDeserialized<V> where V: Serialize + Clone + Send + Sync + 'static, { RawBytes(Vec<u8>), DeserializedValue { value: V, size_hint: usize }, } #[derive(Clone, Debug)] pub struct Cache<V> where V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { cache: HybridCache<Arc<str>, MaybeDeserialized<V>>, } impl<V> Cache<V> where V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { pub async fn new(config: CacheConfig) -> LayerDbResult<Self> { let total_memory_bytes = *TOTAL_SYSTEM_MEMORY_BYTES; let memory_cache_capacity_bytes = { // Subtract reserved memory percentage to determine total usable cache memory let total_usable_memory_bytes = (total_memory_bytes as f64 * (1.0 - (config.memory_reserved_percent as f64 / 100.0))) .floor() as u64; // Compute final usable memory as a percentage of the maximum usable memory let computed_memory_cache_capacity_bytes = (total_usable_memory_bytes as f64 * (config.memory_usable_max_percent as f64 / 100.0)) .floor() as u64; computed_memory_cache_capacity_bytes.try_into()? }; let cache_meter_name: &'static str = config.name.clone().leak(); let builder = HybridCacheBuilder::new() .with_name(config.name.clone()) .with_metrics_registry(Box::new(OpenTelemetryMetricsRegistry::new(global::meter( cache_meter_name, )))) .memory(memory_cache_capacity_bytes) .with_weighter( |_key: &Arc<str>, value: &MaybeDeserialized<V>| match value { MaybeDeserialized::RawBytes(bytes) => bytes.len(), MaybeDeserialized::DeserializedValue { size_hint, .. } => *size_hint, }, ) .storage(Engine::Large) .with_runtime_options(foyer::RuntimeOptions::Unified(TokioRuntimeOptions { max_blocking_threads: 0, worker_threads: 0, })) .with_admission_picker(Arc::new(RateLimitPicker::new( config.disk_admission_rate_limit, ))) .with_large_object_disk_cache_options( LargeEngineOptions::new() .with_buffer_pool_size(config.disk_buffer_size) .with_eviction_pickers(vec![Box::<FifoPicker>::default()]) .with_flushers(config.disk_buffer_flushers) .with_recover_concurrency(config.disk_recover_concurrency) .with_indexer_shards(config.disk_indexer_shards) .with_reclaimers(config.disk_reclaimers), ) .with_recover_mode(RecoverMode::Quiet); let builder = if config.disk_layer { fs::create_dir_all(config.disk_path.as_path()).await?; // Compute total disk which is in use for `disk_path` let total_disk_bytes = fs4::total_space(config.disk_path.as_path())?; let disk_cache_capacity_bytes = { // Subtract reserved disk percentage to determine total usable cache disk let total_usable_disk_bytes = (total_disk_bytes as f64 * (1.0 - (config.disk_reserved_percent as f64 / 100.0))) .floor() as u64; // Compute final usable disk as a percentage of the maximum usable disk let computed_disk_cache_capacity_bytes = (total_usable_disk_bytes as f64 * (config.disk_usable_max_percent as f64 / 100.0)) .floor() as u64; // Ensure that the computed value is at least as big as the Foyer minimum max(computed_disk_cache_capacity_bytes, FOYER_DISK_CACHE_MINUMUM).try_into()? }; info!( cache.name = &config.name, cache.disk.layer = config.disk_layer, cache.disk.total_bytes = total_disk_bytes, cache.disk.size_bytes = disk_cache_capacity_bytes, cache.disk.reserved_percent = config.disk_reserved_percent, cache.disk.usable_max_percent = config.disk_usable_max_percent, cache.disk.rate_limit = config.disk_admission_rate_limit, cache.memory.total_bytes = total_memory_bytes, cache.memory.size_bytes = memory_cache_capacity_bytes, cache.memory.reserved_percent = config.memory_reserved_percent, cache.memory.usable_max_percent = config.memory_usable_max_percent, "creating cache", ); builder.with_device_options( DirectFsDeviceOptions::new(config.disk_path) .with_capacity(disk_cache_capacity_bytes), ) } else { info!( cache.name = &config.name, cache.disk.layer = config.disk_layer, cache.disk.reserved_percent = config.disk_reserved_percent, cache.disk.usable_max_percent = config.disk_usable_max_percent, cache.disk.rate_limit = config.disk_admission_rate_limit, cache.memory.total_bytes = total_memory_bytes, cache.memory.size_bytes = memory_cache_capacity_bytes, cache.memory.reserved_percent = config.memory_reserved_percent, cache.memory.usable_max_percent = config.memory_usable_max_percent, "creating cache", ); builder }; let cache: HybridCache<Arc<str>, MaybeDeserialized<V>> = builder .build() .await .map_err(|e| LayerDbError::Foyer(e.into()))?; Ok(Self { cache }) } pub async fn get(&self, key: Arc<str>) -> Option<V> { if let Ok(Some(entry)) = self.cache.obtain(key.clone()).await { return self.maybe_deserialize(key, entry.value().clone()).await; } None } pub async fn get_from_memory(&self, key: Arc<str>) -> Option<V> { if let Some(entry) = self.cache.memory().get(&key) { return self.maybe_deserialize(key, entry.value().clone()).await; } None } async fn maybe_deserialize(&self, key: Arc<str>, entry: MaybeDeserialized<V>) -> Option<V> { match entry { MaybeDeserialized::DeserializedValue { value, .. } => Some(value.clone()), MaybeDeserialized::RawBytes(bytes) => { // If we fail to deserialize the raw bytes for some reason, pretend that we never // had the key in the first place, and also remove it from the cache. match serialize::from_bytes_async::<V>(&bytes).await { Ok(deserialized) => { self.insert(key, deserialized.clone(), bytes.len()); Some(deserialized) } Err(e) => { error!( "Failed to deserialize stored bytes from memory cache for key ({:?}): {}", key, e ); self.remove(&key); None } } } } } pub fn insert(&self, key: Arc<str>, value: V, size_hint: usize) { self.cache.insert( key, MaybeDeserialized::DeserializedValue { value, size_hint }, ); } pub fn insert_raw_bytes(&self, key: Arc<str>, raw_bytes: Vec<u8>) { self.cache .insert(key, MaybeDeserialized::RawBytes(raw_bytes)); } pub fn remove(&self, key: &str) { self.cache.remove(key); } pub fn contains(&self, key: &str) -> bool { self.cache.contains(key) } pub async fn close(&self) -> LayerDbResult<()> { self.cache .close() .await .map_err(|e| LayerDbError::Foyer(e.into()))?; Ok(()) } } #[derive(Clone, Debug, Deserialize, Serialize)] pub struct CacheConfig { name: String, memory_reserved_percent: u8, memory_usable_max_percent: u8, disk_layer: bool, disk_reserved_percent: u8, disk_usable_max_percent: u8, disk_admission_rate_limit: usize, disk_buffer_size: usize, disk_buffer_flushers: usize, disk_indexer_shards: usize, disk_path: PathBuf, disk_reclaimers: usize, disk_recover_concurrency: usize, } impl Default for CacheConfig { fn default() -> Self { let disk_path = tempfile::TempDir::with_prefix("default-cache-") .expect("unable to create tmp dir for layerdb") .path() .to_path_buf(); Self { name: "default".to_string(), memory_reserved_percent: DEFAULT_MEMORY_RESERVED_PERCENT, memory_usable_max_percent: DEFAULT_MEMORY_USABLE_MAX_PERCENT, disk_layer: true, disk_reserved_percent: DEFAULT_DISK_RESERVED_PERCENT, disk_usable_max_percent: DEFAULT_DISK_USAGE_MAX_PERCENT, disk_admission_rate_limit: DEFAULT_DISK_CACHE_RATE_LIMIT, disk_buffer_size: DEFAULT_DISK_BUFFER_SIZE, disk_buffer_flushers: DEFAULT_DISK_BUFFER_FLUSHERS, disk_indexer_shards: DEFAULT_DISK_INDEXER_SHARDS, disk_path, disk_reclaimers: DEFAULT_DISK_RECLAIMERS, disk_recover_concurrency: DEFAULT_DISK_RECOVER_CONCURRENCY, } } } impl CacheConfig { /// Returns the size of system memory, in bytes. #[inline] pub fn total_system_memory_bytes() -> u64 { *TOTAL_SYSTEM_MEMORY_BYTES } /// Updates the disk layer strategy (i.e. whether to use a disk cache layer or not). pub fn disk_layer(mut self, value: bool) -> Self { self.disk_layer = value; self } // Updates the name for the cache (only used in logs for now). pub fn with_name(mut self, name: impl ToString) -> Self { self.name = name.to_string(); self } /// Updates the reserve percentage of memory which will *never* be used for the cache. /// /// Default is `40`%. pub fn memory_reserved_percent(mut self, value: u8) -> Self { self.memory_reserved_percent = value; self } /// Updates the maximum percentage of usable memory to use for the cache. /// /// Default is `100`%. /// /// Note that this percentage does *not* include the reserved percentage. pub fn memory_usable_max_percent(mut self, value: u8) -> Self { self.memory_usable_max_percent = value; self } /// Updates the reserved percentage of the disk which will *never* be used for the cache. /// /// Default is `5`%. pub fn disk_reserved_percent(mut self, value: u8) -> Self { self.disk_reserved_percent = value; self } /// Updates the maximum percentage of the usable disk to use for the cache. /// /// Default is `100`%. /// /// Note that this percentage does *not* include the reserved percentage. pub fn disk_usable_max_percent(mut self, value: u8) -> Self { self.disk_usable_max_percent = value; self } /// Appends an additional path to the existing disk path pub fn with_path_join(mut self, path: impl AsRef<Path>) -> Self { self.disk_path = self.disk_path.join(path); self } /// Returns the disk path for the cache pub fn disk_path(&self) -> &Path { &self.disk_path } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server