Skip to main content
Glama
s3_write_queue.rs21.8 kB
use std::{ fs, path::{ Path, PathBuf, }, }; use telemetry::tracing; use ulid::Ulid; use crate::event::LayeredEvent; /// Errors that can occur during S3WriteQueue operations #[derive(Debug, thiserror::Error)] pub enum S3WriteQueueError { #[error("Failed to serialize event")] SerializationFailed(#[source] postcard::Error), #[error("Failed to deserialize event from file {file_path}")] DeserializationFailed { file_path: PathBuf, #[source] source: postcard::Error, }, #[error("Invalid filename: expected valid UTF-8 string")] InvalidFilename { path: PathBuf }, #[error("Invalid ULID in filename: {filename}")] InvalidUlid { filename: String, #[source] source: ulid::DecodeError, }, #[error("IO error")] Io(#[from] std::io::Error), #[error("S3 configuration error: {message}")] Configuration { message: String }, #[error("Scan encountered {error_count} corrupted files, moved to dead letter queue")] ScanWithErrors { error_count: usize }, } #[derive(Debug)] pub struct S3WriteQueue { queue_dir: PathBuf, dlq_dir: PathBuf, } impl S3WriteQueue { /// Creates a new S3WriteQueue with persistent disk storage. /// Directory structure: {base_path}/{cache_name}_s3_queue/ /// Dead letter queue: {base_path}/{cache_name}_s3_queue/dead_letter/ pub fn new( base_path: impl AsRef<Path>, cache_name: impl AsRef<str>, ) -> Result<Self, S3WriteQueueError> { let queue_dir = base_path .as_ref() .join(format!("{}_s3_queue", cache_name.as_ref())); let dlq_dir = queue_dir.join("dead_letter"); // Create directories if they don't exist fs::create_dir_all(&queue_dir)?; fs::create_dir_all(&dlq_dir)?; Ok(Self { queue_dir, dlq_dir }) } /// Enqueues a LayeredEvent to disk with ULID-based filename. /// Returns the ULID for tracking. /// File is atomically written to ensure durability. pub fn enqueue(&self, event: &LayeredEvent) -> Result<Ulid, S3WriteQueueError> { let ulid = Ulid::new(); let file_path = self.queue_dir.join(format!("{ulid}.pending")); // Serialize event let bytes = postcard::to_stdvec(event).map_err(S3WriteQueueError::SerializationFailed)?; // Write atomically (write to temp, then rename) let temp_path = self.queue_dir.join(format!("{ulid}.tmp")); fs::write(&temp_path, &bytes)?; fs::rename(&temp_path, &file_path)?; Ok(ulid) } /// Scans queue directory for pending writes, returns them in ULID order. /// Used on startup to load existing queue. /// Corrupted files are automatically moved to the dead letter queue. /// Returns an error only if no files could be processed due to corrupted data. pub fn scan(&self) -> Result<Vec<(Ulid, LayeredEvent)>, S3WriteQueueError> { let mut entries = Vec::new(); let mut errors = Vec::new(); for entry in fs::read_dir(&self.queue_dir)? { let entry = entry?; let path = entry.path(); // Only process .pending files if path.extension().and_then(|s| s.to_str()) != Some("pending") { continue; } // Process this file, collecting errors instead of failing match self.scan_single_file(&path) { Ok((ulid, event)) => { entries.push((ulid, event)); } Err(err) => { // Log the error and attempt to move to DLQ tracing::error!("Failed to process queue file {}: {}", path.display(), err); errors.push(err); // Try to move corrupted file to DLQ // Extract ULID from path for DLQ naming, or use a new ULID if that fails let ulid = path .file_stem() .and_then(|s| s.to_str()) .and_then(|s| Ulid::from_string(s).ok()) .unwrap_or_else(Ulid::new); if let Err(dlq_err) = self.move_corrupted_file_to_dlq(&path, ulid) { tracing::error!( "Failed to move corrupted file {} to DLQ: {}", path.display(), dlq_err ); } } } } // Sort by ULID (chronological order) entries.sort_by_key(|(ulid, _)| *ulid); // Report aggregate errors if any occurred if !errors.is_empty() { tracing::warn!( "Scan completed with {} corrupted files moved to dead letter queue", errors.len() ); // Return error if ALL files were corrupted (nothing successfully loaded) if entries.is_empty() { return Err(S3WriteQueueError::ScanWithErrors { error_count: errors.len(), }); } } Ok(entries) } /// Processes a single pending file during scan fn scan_single_file(&self, path: &Path) -> Result<(Ulid, LayeredEvent), S3WriteQueueError> { // Parse ULID from filename let filename = path.file_stem().and_then(|s| s.to_str()).ok_or_else(|| { S3WriteQueueError::InvalidFilename { path: path.to_path_buf(), } })?; let ulid = Ulid::from_string(filename).map_err(|source| S3WriteQueueError::InvalidUlid { filename: filename.to_string(), source, })?; // Read and deserialize event let bytes = fs::read(path)?; let event: LayeredEvent = postcard::from_bytes(&bytes).map_err(|source| { S3WriteQueueError::DeserializationFailed { file_path: path.to_path_buf(), source, } })?; Ok((ulid, event)) } /// Moves a corrupted file to the dead letter queue (used during scan) fn move_corrupted_file_to_dlq(&self, path: &Path, ulid: Ulid) -> Result<(), S3WriteQueueError> { let dest_path = self.dlq_dir.join(format!("{ulid}.corrupted")); fs::rename(path, &dest_path)?; Ok(()) } /// Removes a completed write from the queue. /// Succeeds even if file doesn't exist (idempotent). pub fn remove(&self, ulid: Ulid) -> Result<(), S3WriteQueueError> { let file_path = self.queue_dir.join(format!("{ulid}.pending")); match fs::remove_file(&file_path) { Ok(()) => Ok(()), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), // Idempotent Err(e) => Err(e.into()), } } /// Moves a corrupted write to the dead letter queue. /// Preserves the event data for debugging. pub fn move_to_dlq( &self, ulid: Ulid, error: &S3WriteQueueError, ) -> Result<(), S3WriteQueueError> { let source_path = self.queue_dir.join(format!("{ulid}.pending")); let dest_path = self.dlq_dir.join(format!("{ulid}.corrupted")); // Log error details at ERROR level as specified in architecture tracing::error!( "Moving corrupted queue file {} to dead letter queue. Error: {}", ulid, error ); // Move file to DLQ fs::rename(&source_path, &dest_path)?; Ok(()) } /// Returns current queue depth (number of pending writes) pub fn depth(&self) -> usize { match self.scan() { Ok(items) => items.len(), Err(_) => 0, // Return 0 on error rather than failing } } } #[cfg(test)] mod tests { use std::sync::Arc; use chrono::Utc; use si_events::{ Actor, ChangeSetId, Tenancy, WorkspacePk, }; use tempfile::TempDir; use super::*; use crate::event::{ LayeredEventKind, LayeredEventMetadata, }; fn create_test_event() -> LayeredEvent { LayeredEvent { event_id: crate::event::LayeredEventId::new(), event_kind: LayeredEventKind::Raw, key: Arc::from("test_key"), metadata: LayeredEventMetadata { tenancy: Tenancy::new(WorkspacePk::new(), ChangeSetId::new()), actor: Actor::System, timestamp: Utc::now(), }, payload: crate::event::LayeredEventPayload { db_name: Arc::new("test_db".to_string()), key: Arc::from("test_key"), sort_key: Arc::new("test_sort".to_string()), value: Arc::new(vec![1, 2, 3, 4]), }, web_events: None, } } #[test] fn test_new_creates_queue_directory() { let temp_dir = TempDir::new().unwrap(); let base_path = temp_dir.path(); let _queue = S3WriteQueue::new(base_path, "test_cache").unwrap(); // Queue directory should exist let expected_dir = base_path.join("test_cache_s3_queue"); assert!(expected_dir.exists()); assert!(expected_dir.is_dir()); } #[test] fn test_new_creates_dlq_directory() { let temp_dir = TempDir::new().unwrap(); let base_path = temp_dir.path(); let _queue = S3WriteQueue::new(base_path, "test_cache").unwrap(); // DLQ directory should exist let expected_dlq = base_path.join("test_cache_s3_queue").join("dead_letter"); assert!(expected_dlq.exists()); assert!(expected_dlq.is_dir()); } #[test] fn test_enqueue_creates_pending_file() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let event = create_test_event(); let ulid = queue.enqueue(&event).unwrap(); // Check file exists with correct name let file_path = temp_dir .path() .join("test_s3_queue") .join(format!("{ulid}.pending")); assert!(file_path.exists()); assert!(file_path.is_file()); } #[test] fn test_enqueue_serializes_event() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let event = create_test_event(); let ulid = queue.enqueue(&event).unwrap(); // Read file and deserialize let file_path = temp_dir .path() .join("test_s3_queue") .join(format!("{ulid}.pending")); let bytes = std::fs::read(&file_path).unwrap(); let deserialized: LayeredEvent = postcard::from_bytes(&bytes).unwrap(); assert_eq!(deserialized.payload.db_name, event.payload.db_name); } #[test] fn test_scan_empty_queue() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let items = queue.scan().unwrap(); assert_eq!(items.len(), 0); } #[test] fn test_scan_returns_all_pending_files() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let event1 = create_test_event(); let event2 = create_test_event(); let event3 = create_test_event(); queue.enqueue(&event1).unwrap(); queue.enqueue(&event2).unwrap(); queue.enqueue(&event3).unwrap(); let items = queue.scan().unwrap(); assert_eq!(items.len(), 3); } #[test] fn test_scan_returns_ulid_sorted_order() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let event = create_test_event(); // Enqueue with slight delays to ensure different ULIDs let ulid1 = queue.enqueue(&event).unwrap(); std::thread::sleep(std::time::Duration::from_millis(2)); let ulid2 = queue.enqueue(&event).unwrap(); std::thread::sleep(std::time::Duration::from_millis(2)); let ulid3 = queue.enqueue(&event).unwrap(); let items = queue.scan().unwrap(); // Should be in ULID order (chronological) assert_eq!(items[0].0, ulid1); assert_eq!(items[1].0, ulid2); assert_eq!(items[2].0, ulid3); } #[test] fn test_scan_ignores_non_pending_files() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let event = create_test_event(); queue.enqueue(&event).unwrap(); // Create a non-pending file in the queue directory let queue_dir = temp_dir.path().join("test_s3_queue"); std::fs::write(queue_dir.join("readme.txt"), "test").unwrap(); std::fs::write(queue_dir.join("other.tmp"), "test").unwrap(); let items = queue.scan().unwrap(); // Should only find the one .pending file assert_eq!(items.len(), 1); } #[test] fn test_remove_deletes_file() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let event = create_test_event(); let ulid = queue.enqueue(&event).unwrap(); // File should exist let file_path = temp_dir .path() .join("test_s3_queue") .join(format!("{ulid}.pending")); assert!(file_path.exists()); // Remove it queue.remove(ulid).unwrap(); // File should be gone assert!(!file_path.exists()); } #[test] fn test_remove_nonexistent_file_succeeds() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let ulid = Ulid::new(); // Removing non-existent file should not error let result = queue.remove(ulid); assert!(result.is_ok()); } #[test] fn test_move_to_dlq_moves_file() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let event = create_test_event(); let ulid = queue.enqueue(&event).unwrap(); let original_path = temp_dir .path() .join("test_s3_queue") .join(format!("{ulid}.pending")); assert!(original_path.exists()); // Move to DLQ let error = S3WriteQueueError::DeserializationFailed { file_path: original_path.clone(), source: postcard::Error::DeserializeUnexpectedEnd, }; queue.move_to_dlq(ulid, &error).unwrap(); // Original should be gone assert!(!original_path.exists()); // Should be in DLQ with .corrupted extension let dlq_path = temp_dir .path() .join("test_s3_queue") .join("dead_letter") .join(format!("{ulid}.corrupted")); assert!(dlq_path.exists()); } #[test] fn test_move_to_dlq_preserves_event_data() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); let event = create_test_event(); let ulid = queue.enqueue(&event).unwrap(); let original_path = temp_dir .path() .join("test_s3_queue") .join(format!("{ulid}.pending")); let error = S3WriteQueueError::DeserializationFailed { file_path: original_path, source: postcard::Error::DeserializeUnexpectedEnd, }; queue.move_to_dlq(ulid, &error).unwrap(); // Read from DLQ and verify data preserved let dlq_path = temp_dir .path() .join("test_s3_queue") .join("dead_letter") .join(format!("{ulid}.corrupted")); let bytes = std::fs::read(&dlq_path).unwrap(); let deserialized: LayeredEvent = postcard::from_bytes(&bytes).unwrap(); assert_eq!(deserialized.payload.db_name, event.payload.db_name); } #[test] fn test_scan_handles_corrupted_file_gracefully() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); // Enqueue valid events let event1 = create_test_event(); let event2 = create_test_event(); let ulid1 = queue.enqueue(&event1).unwrap(); std::thread::sleep(std::time::Duration::from_millis(2)); let ulid2 = queue.enqueue(&event2).unwrap(); // Create a corrupted file let corrupted_ulid = Ulid::new(); let corrupted_path = temp_dir .path() .join("test_s3_queue") .join(format!("{corrupted_ulid}.pending")); std::fs::write(&corrupted_path, b"corrupted data").unwrap(); // Scan should succeed and return the valid events let items = queue.scan().unwrap(); // Should have loaded the 2 valid events assert_eq!(items.len(), 2); assert_eq!(items[0].0, ulid1); assert_eq!(items[1].0, ulid2); // Corrupted file should be moved to DLQ assert!(!corrupted_path.exists()); let dlq_path = temp_dir .path() .join("test_s3_queue") .join("dead_letter") .join(format!("{corrupted_ulid}.corrupted")); assert!(dlq_path.exists()); } #[test] fn test_scan_with_invalid_filename() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); // Enqueue a valid event let event = create_test_event(); let ulid = queue.enqueue(&event).unwrap(); // Create a file with invalid ULID in filename let invalid_path = temp_dir .path() .join("test_s3_queue") .join("not-a-ulid.pending"); std::fs::write(&invalid_path, b"some data").unwrap(); // Scan should succeed and return the valid event let items = queue.scan().unwrap(); // Should have loaded the 1 valid event assert_eq!(items.len(), 1); assert_eq!(items[0].0, ulid); // Invalid file should be moved to DLQ assert!(!invalid_path.exists()); // Check that something was moved to DLQ let dlq_dir = temp_dir.path().join("test_s3_queue").join("dead_letter"); let dlq_entries: Vec<_> = std::fs::read_dir(&dlq_dir) .unwrap() .filter_map(Result::ok) .collect(); assert!(!dlq_entries.is_empty(), "Expected corrupted file in DLQ"); } #[test] fn test_scan_all_files_corrupted() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); // Create multiple corrupted files for i in 0..3 { let corrupted_ulid = Ulid::new(); let corrupted_path = temp_dir .path() .join("test_s3_queue") .join(format!("{corrupted_ulid}.pending")); std::fs::write(&corrupted_path, format!("corrupted data {i}")).unwrap(); std::thread::sleep(std::time::Duration::from_millis(1)); } // Scan should fail with ScanWithErrors since all files are corrupted let result = queue.scan(); assert!(result.is_err()); match result { Err(S3WriteQueueError::ScanWithErrors { error_count }) => { assert_eq!(error_count, 3); } _ => panic!("Expected ScanWithErrors"), } // All corrupted files should be in DLQ let dlq_dir = temp_dir.path().join("test_s3_queue").join("dead_letter"); let dlq_count = std::fs::read_dir(&dlq_dir).unwrap().count(); assert_eq!(dlq_count, 3); } #[test] fn test_scan_mixed_valid_and_corrupted() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); // Create a mix of valid and corrupted files let event1 = create_test_event(); let ulid1 = queue.enqueue(&event1).unwrap(); let corrupted_ulid = Ulid::new(); let corrupted_path = temp_dir .path() .join("test_s3_queue") .join(format!("{corrupted_ulid}.pending")); std::fs::write(&corrupted_path, b"corrupted").unwrap(); let event2 = create_test_event(); let ulid2 = queue.enqueue(&event2).unwrap(); // Scan should succeed with a warning let items = queue.scan().unwrap(); // Should have loaded the 2 valid events assert_eq!(items.len(), 2); // Verify the valid events are present let ulids: Vec<_> = items.iter().map(|(ulid, _)| *ulid).collect(); assert!(ulids.contains(&ulid1)); assert!(ulids.contains(&ulid2)); // Corrupted file should be in DLQ assert!(!corrupted_path.exists()); let dlq_path = temp_dir .path() .join("test_s3_queue") .join("dead_letter") .join(format!("{corrupted_ulid}.corrupted")); assert!(dlq_path.exists()); } #[test] fn test_depth_returns_count() { let temp_dir = TempDir::new().unwrap(); let queue = S3WriteQueue::new(temp_dir.path(), "test").unwrap(); assert_eq!(queue.depth(), 0); queue.enqueue(&create_test_event()).unwrap(); assert_eq!(queue.depth(), 1); queue.enqueue(&create_test_event()).unwrap(); assert_eq!(queue.depth(), 2); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server