Skip to main content
Glama
retry_queue.rs25.4 kB
//! Persistent retry queue for handling transient PostgreSQL write failures. //! //! # Overview //! //! The retry queue system ensures data durability by persisting failed writes to disk //! and retrying them with exponential backoff. Each LayerCache gets its own retry queue //! directory with individual files per failed write. //! //! # Architecture //! //! - **RetryQueueManager**: Central component managing all retry queues //! - **QueueState**: Per-cache backoff and file tracking //! - **File format**: Postcard-serialized `LayeredEvent` with ULID filenames for ordering //! - **Backoff**: Exponential (100ms → 5s max), resets on success per queue //! //! # Directory Structure //! //! ```text //! {base_path}/ //! ├── cas_retries/ //! │ ├── 01JQRS123456789ABCDEFG.pending //! │ └── 01JQRS234567890BCDEFGH.pending //! ├── workspace_snapshot_retries/ //! │ └── 01JQRS345678901CDEFGHI.pending //! └── ... //! ``` //! //! # Usage //! //! The retry queue is integrated into `PersisterTask`: //! //! 1. New writes attempt direct PostgreSQL write //! 2. On transient failure (connection/network), event is enqueued to disk //! 3. Background loop continuously attempts retries with backoff //! 4. On success, file is deleted and backoff resets //! 5. On startup, existing queues are scanned and retried immediately //! //! # Configuration //! //! Configure via `RetryQueueConfig`: //! - `base_path`: Directory for all retry queues (default: alongside disk cache) //! - `initial_backoff`: Starting delay (default: 100ms) //! - `max_backoff`: Maximum delay (default: 5s) //! - `backoff_multiplier`: Growth factor (default: 2.0) use std::{ collections::{ BTreeSet, HashMap, }, ffi::OsString, path::PathBuf, time::{ Duration, Instant, }, }; use telemetry_utils::{ gauge, monotonic, }; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use crate::{ BackendType, error::{ LayerDbError, LayerDbResult, }, event::LayeredEvent, }; /// Configuration for retry queue behavior #[derive(Debug, Clone)] pub struct RetryQueueConfig { /// Base directory for all retry queues pub base_path: PathBuf, /// Initial backoff duration (default: 100ms) pub initial_backoff: Duration, /// Maximum backoff duration (default: 5 seconds) pub max_backoff: Duration, /// Backoff multiplier (default: 2.0) pub backoff_multiplier: f64, } impl Default for RetryQueueConfig { fn default() -> Self { Self { base_path: PathBuf::from("retry_queues"), initial_backoff: Duration::from_millis(100), max_backoff: Duration::from_secs(5), backoff_multiplier: 2.0, } } } /// Tracks backoff state for a single queue #[derive(Debug)] struct QueueState { retry_dir: PathBuf, #[allow(dead_code)] // Stored for potential future logging/debugging backend: BackendType, current_backoff: Duration, next_retry_time: Instant, pending_files: BTreeSet<OsString>, } impl QueueState { fn new(retry_dir: PathBuf, backend: BackendType, initial_backoff: Duration) -> Self { Self { retry_dir, backend, current_backoff: initial_backoff, next_retry_time: Instant::now(), pending_files: BTreeSet::new(), } } /// Record a failed retry attempt and update backoff fn record_failure( &mut self, max_backoff: Duration, multiplier: f64, initial_backoff: Duration, ) { self.current_backoff = calculate_next_backoff( self.current_backoff, max_backoff, multiplier, initial_backoff, ); self.next_retry_time = Instant::now() + self.current_backoff; } /// Record a successful retry and reset backoff fn record_success(&mut self, initial_backoff: Duration) { self.current_backoff = initial_backoff; self.next_retry_time = Instant::now(); } /// Check if this queue is ready to retry fn is_ready(&self) -> bool { Instant::now() >= self.next_retry_time && !self.pending_files.is_empty() } } /// Handle for tracking a retry attempt #[derive(Debug)] pub struct RetryHandle { pub(crate) cache_name: String, pub(crate) backend: BackendType, pub(crate) filename: OsString, } /// Calculate the next backoff duration using exponential backoff with full jitter /// /// Full jitter prevents thundering herd by randomizing retry times between the /// minimum (initial_backoff) and maximum (calculated exponential backoff capped at max). /// This follows the AWS recommendation and matches the pattern in dal/job/consumer.rs. fn calculate_next_backoff( current: Duration, max: Duration, multiplier: f64, initial: Duration, ) -> Duration { use rand::Rng; // Calculate next exponential backoff value, clamping to max let next_secs = current.as_secs_f64() * multiplier; let next_backoff = std::cmp::min(Duration::from_secs_f64(next_secs), max); // Apply full jitter: random value between initial_backoff and calculated next_backoff let mut rng = rand::thread_rng(); let min_micros = initial.as_micros() as u64; let max_micros = next_backoff.as_micros() as u64; let jittered_micros = rng.gen_range(min_micros..=max_micros); Duration::from_micros(jittered_micros) } /// Determine if a LayerDbError represents a transient failure that should be retried pub fn is_retryable_error(error: &LayerDbError) -> bool { match error { // Direct PG errors are always retryable (connection/pool issues) LayerDbError::Pg(_) | LayerDbError::PgPool(_) => true, // For PersisterTaskFailed, only retry if PostgreSQL failed // NATS-only failures are not retried - remote services will fetch from PG on cache miss LayerDbError::PersisterTaskFailed(task_error) => task_error.pg_error.is_some(), // All other errors are non-retryable _ => false, } } /// Messages sent to RetryQueueManager for queue operations #[derive(Debug)] pub enum RetryQueueMessage { /// Enqueue a failed event for retry Enqueue { event: LayeredEvent, backend: BackendType, }, /// Mark a retry attempt as successful (removes from queue) MarkSuccess(RetryHandle), /// Mark a retry attempt as failed with retryable error (updates backoff) MarkRetryableFailure(RetryHandle, LayerDbError), /// Mark a retry attempt as permanently failed (removes from queue) MarkPermanentFailure(RetryHandle), } /// Write a LayeredEvent to a retry queue file with fsync for durability /// /// Uses write-to-temp-then-rename for atomicity, with fsync before rename /// to ensure data is on disk even if system crashes immediately after write. async fn write_retry_file(path: &std::path::Path, event: &LayeredEvent) -> LayerDbResult<()> { use tokio::{ fs, io::AsyncWriteExt, }; let bytes = postcard::to_stdvec(event)?; // Write to temp file first, then atomic rename let temp_path = path.with_extension("tmp"); // Open file for writing let mut file = fs::OpenOptions::new() .write(true) .create(true) .truncate(true) .open(&temp_path) .await .map_err(LayerDbError::RetryQueueFileWrite)?; // Write data to file file.write_all(&bytes) .await .map_err(LayerDbError::RetryQueueFileWrite)?; // Flush to disk - this is the critical durability guarantee file.sync_all() .await .map_err(LayerDbError::RetryQueueFileWrite)?; // Explicitly close file before rename (drop) drop(file); // Atomic rename to final location fs::rename(&temp_path, path) .await .map_err(LayerDbError::RetryQueueFileWrite)?; Ok(()) } /// Read a LayeredEvent from a retry queue file async fn read_retry_file(path: &std::path::Path) -> LayerDbResult<LayeredEvent> { use tokio::fs; let bytes = fs::read(path) .await .map_err(LayerDbError::RetryQueueFileRead)?; let event = postcard::from_bytes(&bytes)?; Ok(event) } /// Delete a retry queue file async fn delete_retry_file(path: &std::path::Path) -> LayerDbResult<()> { use tokio::fs; fs::remove_file(path) .await .map_err(LayerDbError::RetryQueueFileDelete)?; Ok(()) } /// Manages persistent retry queues for all LayerCache instances #[derive(Debug)] pub struct RetryQueueManager { config: RetryQueueConfig, queues: HashMap<(String, BackendType), QueueState>, } impl RetryQueueManager { pub fn new(config: RetryQueueConfig) -> Self { Self { config, queues: HashMap::new(), } } /// Get retry directory path for a specific backend /// Uses BackendType::as_ref() to get snake_case backend name fn retry_dir_for_backend(&self, cache_name: &str, backend: BackendType) -> PathBuf { self.config .base_path .join(format!("{}_retries_{}", cache_name, backend.as_ref())) } /// Run the retry queue manager as an independent task pub async fn run( mut self, mut queue_rx: mpsc::UnboundedReceiver<RetryQueueMessage>, ready_tx: mpsc::UnboundedSender<(LayeredEvent, RetryHandle)>, shutdown_token: CancellationToken, ) { use telemetry::prelude::*; loop { tokio::select! { biased; _ = shutdown_token.cancelled() => { debug!("RetryQueueManager received shutdown signal"); break; } Some(msg) = queue_rx.recv() => { self.handle_message(msg).await; } result = self.wait_for_ready_retry() => { let _ = ready_tx.send(result); } } } debug!("RetryQueueManager shutdown complete"); } /// Handle incoming messages from PersisterTask and spawned tasks async fn handle_message(&mut self, msg: RetryQueueMessage) { use telemetry::prelude::*; match msg { RetryQueueMessage::Enqueue { event, backend } => { if let Err(err) = self.enqueue(event, backend).await { error!(error = ?err, "failed to enqueue for retry"); } } RetryQueueMessage::MarkSuccess(handle) => { if let Err(err) = self.mark_success(handle).await { error!(error = ?err, "failed to mark retry as successful"); } } RetryQueueMessage::MarkRetryableFailure(handle, error) => { self.mark_failure(handle, &error); } RetryQueueMessage::MarkPermanentFailure(handle) => { self.mark_permanent_failure(handle).await; } } } /// Scan filesystem for existing retry queue directories and load pending files pub async fn scan_existing_queues(&mut self, cache_names: &[&str]) -> LayerDbResult<()> { use telemetry::prelude::*; use tokio::fs; for &cache_name in cache_names { // Check for OLD format: {cache_name}_retries/ let old_retry_dir = self.config.base_path.join(format!("{cache_name}_retries")); if old_retry_dir.exists() { info!( cache.name = cache_name, "found old retry queue format, migrating to backend-specific queues" ); // Migrate old files to Postgres queue (backwards compatibility) self.migrate_old_queue_files(&old_retry_dir, cache_name, BackendType::Postgres) .await?; } // Scan NEW format: {cache_name}_retries_postgres/ and {cache_name}_retries_s3/ for backend in [BackendType::Postgres, BackendType::S3] { let retry_dir = self.retry_dir_for_backend(cache_name, backend); if !retry_dir.exists() { continue; } // Read directory entries let mut entries = fs::read_dir(&retry_dir) .await .map_err(LayerDbError::RetryQueueDirRead)?; let mut pending_files = BTreeSet::new(); while let Some(entry) = entries .next_entry() .await .map_err(LayerDbError::RetryQueueDirRead)? { let path = entry.path(); // Only process .pending files if path.extension() == Some(std::ffi::OsStr::new("pending")) { pending_files.insert(entry.file_name()); } } if !pending_files.is_empty() { let queue_depth = pending_files.len(); info!( cache.name = cache_name, backend = ?backend, queue.depth = queue_depth, "found existing retry queue on startup" ); gauge!( layer_cache_retry_queue_depth = queue_depth, cache_name = cache_name, backend = backend.as_ref() ); let mut state = QueueState::new(retry_dir, backend, self.config.initial_backoff); state.pending_files = pending_files; // Retry immediately on startup state.next_retry_time = Instant::now(); self.queues.insert((cache_name.to_string(), backend), state); } } } Ok(()) } /// Migrate old retry queue files to backend-specific directories async fn migrate_old_queue_files( &mut self, old_dir: &std::path::Path, cache_name: &str, target_backend: BackendType, ) -> LayerDbResult<()> { use telemetry::prelude::*; use tokio::fs; let new_dir = self.retry_dir_for_backend(cache_name, target_backend); // Create new directory fs::create_dir_all(&new_dir) .await .map_err(LayerDbError::RetryQueueDirCreate)?; // Move all .pending files let mut entries = fs::read_dir(old_dir) .await .map_err(LayerDbError::RetryQueueDirRead)?; let mut migrated_count = 0; while let Some(entry) = entries .next_entry() .await .map_err(LayerDbError::RetryQueueDirRead)? { let old_path = entry.path(); if old_path.extension() == Some(std::ffi::OsStr::new("pending")) { let file_name = entry.file_name(); let new_path = new_dir.join(&file_name); // Move file atomically fs::rename(&old_path, &new_path) .await .map_err(LayerDbError::RetryQueueFileWrite)?; migrated_count += 1; } } // Try to remove old directory (will fail if not empty, which is fine) let _ = fs::remove_dir(old_dir).await; info!( cache.name = cache_name, backend = ?target_backend, migrated_files = migrated_count, "migrated old retry queue files to new format" ); Ok(()) } /// Enqueue a failed write for retry pub async fn enqueue( &mut self, event: LayeredEvent, backend: BackendType, ) -> LayerDbResult<()> { use telemetry::prelude::*; use tokio::fs; use ulid::Ulid; let cache_name = event.payload.db_name.to_string(); let retry_dir = self.retry_dir_for_backend(&cache_name, backend); // Ensure retry directory exists fs::create_dir_all(&retry_dir) .await .map_err(LayerDbError::RetryQueueDirCreate)?; // Generate ULID for ordering and uniqueness let ulid = Ulid::new(); let filename = format!("{ulid}.pending"); let file_path = retry_dir.join(&filename); // Write event to disk write_retry_file(&file_path, &event).await?; // Get or create queue state let state = self .queues .entry((cache_name.clone(), backend)) .or_insert_with(|| QueueState::new(retry_dir, backend, self.config.initial_backoff)); // Add filename to pending set state.pending_files.insert(filename.into()); let queue_depth = state.pending_files.len(); info!( cache.name = %cache_name, backend = ?backend, event.kind = ?event.event_kind, event.ulid = %ulid, queue.depth = queue_depth, "persister write failed, enqueued for retry" ); monotonic!( layer_cache_retry_queue_enqueued = 1, cache_name = cache_name.clone(), backend = backend.as_ref() ); gauge!( layer_cache_retry_queue_depth = queue_depth, cache_name = cache_name, backend = backend.as_ref() ); Ok(()) } /// Check if an error should result in enqueueing for retry pub fn should_enqueue(&self, error: &LayerDbError) -> bool { is_retryable_error(error) } /// Get the next retry time for a specific cache queue (for testing/observability) pub fn next_retry_time(&self, cache_name: &str, backend: BackendType) -> Option<Instant> { self.queues .get(&(cache_name.to_string(), backend)) .map(|state| state.next_retry_time) } /// Try to get a ready retry without blocking async fn try_get_ready_retry(&mut self) -> Option<(LayeredEvent, RetryHandle)> { use telemetry::prelude::*; // Find first ready queue with pending files let (cache_name, backend) = self .queues .iter() .find(|(_, state)| state.is_ready()) .map(|((name, backend), _)| (name.clone(), *backend))?; let state = self.queues.get_mut(&(cache_name.clone(), backend))?; // Get first filename (BTreeSet is sorted) let filename = state.pending_files.iter().next()?.clone(); let file_path = state.retry_dir.join(&filename); // Read event from disk match read_retry_file(&file_path).await { Ok(event) => { debug!( cache.name = %cache_name, backend = ?backend, filename = ?filename, backoff_ms = state.current_backoff.as_millis(), "attempting retry from queue" ); // Remove from pending files immediately to prevent duplicate retries state.pending_files.remove(&filename); let handle = RetryHandle { cache_name, backend, filename, }; Some((event, handle)) } Err(err) => { error!( cache.name = %cache_name, backend = ?backend, filename = ?filename, error = %err, "failed to read retry file, skipping" ); // Remove corrupted file from tracking state.pending_files.remove(&filename); None } } } /// Calculate how long to sleep until the next queue becomes ready fn calculate_next_wakeup(&self) -> Duration { self.queues .values() .filter(|q| !q.pending_files.is_empty()) .map(|q| q.next_retry_time.saturating_duration_since(Instant::now())) .min() .unwrap_or(Duration::from_secs(60)) // Default: check every minute if no queues } /// Waits until at least one retry is ready, then returns it async fn wait_for_ready_retry(&mut self) -> (LayeredEvent, RetryHandle) { loop { // Try to get a ready retry immediately if let Some(result) = self.try_get_ready_retry().await { return result; } // Calculate sleep until next retry becomes ready let sleep_duration = self.calculate_next_wakeup(); tokio::time::sleep(sleep_duration).await; } } /// Mark a retry attempt as successful and remove from queue pub async fn mark_success(&mut self, handle: RetryHandle) -> LayerDbResult<()> { use telemetry::prelude::*; let state = match self .queues .get_mut(&(handle.cache_name.clone(), handle.backend)) { Some(state) => state, None => return Ok(()), // Queue was removed, nothing to do }; let file_path = state.retry_dir.join(&handle.filename); // Delete file from disk if let Err(err) = delete_retry_file(&file_path).await { warn!( cache.name = %handle.cache_name, backend = ?handle.backend, filename = ?handle.filename, error = %err, "failed to delete retry file after success" ); } // Reset backoff on success state.record_success(self.config.initial_backoff); let queue_depth = state.pending_files.len(); info!( cache.name = %handle.cache_name, backend = ?handle.backend, filename = ?handle.filename, queue.depth = queue_depth, "retry successful, removed from queue" ); monotonic!( layer_cache_retry_queue_success = 1, cache_name = &handle.cache_name, backend = handle.backend.as_ref() ); gauge!( layer_cache_retry_queue_depth = queue_depth, cache_name = &handle.cache_name, backend = handle.backend.as_ref() ); Ok(()) } /// Mark a retry attempt as failed and update backoff pub fn mark_failure(&mut self, handle: RetryHandle, error: &LayerDbError) { use telemetry::prelude::*; let state = match self .queues .get_mut(&(handle.cache_name.clone(), handle.backend)) { Some(state) => state, None => return, // Queue was removed, nothing to do }; // Re-add file to pending set since retry failed state.pending_files.insert(handle.filename.clone()); // Update backoff state.record_failure( self.config.max_backoff, self.config.backoff_multiplier, self.config.initial_backoff, ); let error_type = format!("{error:?}") .split('(') .next() .unwrap_or("Unknown") .to_string(); warn!( cache.name = %handle.cache_name, backend = ?handle.backend, filename = ?handle.filename, retry.backoff_ms = state.current_backoff.as_millis(), error = %error, error.type = %error_type, "retry failed, will retry after backoff" ); monotonic!( layer_cache_retry_queue_failed = 1, cache_name = &handle.cache_name, backend = handle.backend.as_ref() ); } /// Mark a retry attempt as permanently failed and remove from queue pub async fn mark_permanent_failure(&mut self, handle: RetryHandle) { use telemetry::prelude::*; let state = match self .queues .get_mut(&(handle.cache_name.clone(), handle.backend)) { Some(state) => state, None => return, // Queue was removed, nothing to do }; let file_path = state.retry_dir.join(&handle.filename); // Delete file from disk if let Err(err) = delete_retry_file(&file_path).await { warn!( cache.name = %handle.cache_name, backend = ?handle.backend, filename = ?handle.filename, error = %err, "failed to delete retry file after permanent failure" ); } let queue_depth = state.pending_files.len(); warn!( cache.name = %handle.cache_name, backend = ?handle.backend, filename = ?handle.filename, queue.depth = queue_depth, "retry permanently failed (non-retryable error), removed from queue" ); monotonic!( layer_cache_retry_queue_permanent_failure = 1, cache_name = &handle.cache_name, backend = handle.backend.as_ref() ); gauge!( layer_cache_retry_queue_depth = queue_depth, cache_name = &handle.cache_name, backend = handle.backend.as_ref() ); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server