Skip to main content
Glama
s3_queue_processor.rs10.1 kB
use std::{ sync::Arc, time::Duration, }; use aws_sdk_s3::Client; use futures::FutureExt; use telemetry::prelude::*; use telemetry_utils::{ gauge, histogram, monotonic, }; use tokio::{ sync::Notify, time::sleep, }; use ulid::Ulid; use crate::{ error::{ AwsSdkError, LayerDbError, S3Error, S3Operation, }, event::LayeredEvent, rate_limiter::{ RateLimitConfig, RateLimiter, }, s3::categorize_s3_error, s3_write_queue::{ S3WriteQueue, S3WriteQueueError, }, }; pub struct S3QueueProcessor { queue: Arc<S3WriteQueue>, rate_limiter: RateLimiter, s3_client: Client, bucket_name: String, cache_name: String, shutdown: Arc<Notify>, } impl S3QueueProcessor { pub fn new( queue: Arc<S3WriteQueue>, rate_limiter_config: RateLimitConfig, s3_client: Client, bucket_name: String, cache_name: String, ) -> Self { Self { queue, rate_limiter: RateLimiter::new(rate_limiter_config), s3_client, bucket_name, cache_name, shutdown: Arc::new(Notify::new()), } } pub fn shutdown_handle(&self) -> Arc<Notify> { Arc::clone(&self.shutdown) } fn record_metrics(&self) { let depth = self.queue.depth(); let backoff_ms = self.rate_limiter.current_delay().as_millis() as u64; gauge!( s3_write_queue_depth = depth, cache_name = &self.cache_name, backend = "s3" ); gauge!( s3_write_backoff_ms = backoff_ms, cache_name = &self.cache_name, backend = "s3" ); } fn record_write_attempt(&self, result: &str) { monotonic!( s3_write_attempts_total = 1, cache_name = &self.cache_name, backend = "s3", result = result ); } fn record_write_duration(&self, duration_ms: u64) { histogram!( s3_write_duration_ms = duration_ms, cache_name = &self.cache_name, backend = "s3" ); } pub async fn process_queue(mut self) { loop { // Check for shutdown signal (non-blocking) if self.shutdown.notified().now_or_never().is_some() { debug!(cache = %self.cache_name, "S3QueueProcessor received shutdown signal, exiting"); break; } // Record metrics before processing self.record_metrics(); // Apply backoff delay if any let delay = self.rate_limiter.current_delay(); if delay > Duration::ZERO { sleep(delay).await; } // Scan queue for next item let items = match self.queue.scan() { Ok(items) => items, Err(e) => { error!(cache = %self.cache_name, "Failed to scan S3 write queue: {}", e); sleep(Duration::from_secs(1)).await; // Brief pause before retry continue; } }; // Queue empty - wait briefly then check again if items.is_empty() { sleep(Duration::from_millis(100)).await; continue; } // Process oldest item (first in ULID order) let (ulid, event) = items.into_iter().next().unwrap(); self.process_item(ulid, event).await; } } #[instrument( skip(self, event), fields( cache = %self.cache_name, ulid = %ulid, backoff_ms = self.rate_limiter.current_delay().as_millis(), queue_depth = self.queue.depth(), result = tracing::field::Empty, ) )] async fn process_item(&mut self, ulid: Ulid, event: LayeredEvent) { trace!("Processing S3 write from queue"); let span = Span::current(); // Key is already transformed - write directly to S3 // Event.key contains the final S3 key after transformation let result = self .s3_client .put_object() .bucket(&self.bucket_name) .key(event.key.as_ref()) // Pre-transformed key from queue .body(Arc::unwrap_or_clone(event.payload.value).into()) .send() .await; // Categorize result for metrics and rate limiter let categorized_result = result.map_err(|sdk_err| { // Convert AWS SDK error to categorized S3Error let aws_error = AwsSdkError::PutObject(sdk_err); let s3_error = categorize_s3_error( aws_error, S3Operation::Put, self.cache_name.clone(), event.key.to_string(), ); LayerDbError::S3(Box::new(s3_error)) }); match categorized_result { Ok(_) => { span.record("result", "success"); self.handle_success(ulid).await } Err(s3_error) => { // Classify the error to record appropriate result let result_str = match &s3_error { LayerDbError::S3(boxed_error) => match boxed_error.as_ref() { S3Error::Throttling { .. } => "throttle", S3Error::Configuration { .. } => "error_configuration", _ => "error_transient", }, _ => "error_transient", }; span.record("result", result_str); self.handle_error(ulid, &s3_error).await } } } async fn handle_success(&mut self, ulid: Ulid) { // Calculate duration from ULID timestamp to now let now_ms = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis() as u64; let ulid_ms = ulid.timestamp_ms(); let duration_ms = now_ms.saturating_sub(ulid_ms); // Remove from queue if let Err(e) = self.queue.remove(ulid) { error!(cache = %self.cache_name, ulid = %ulid, error = %e, "Failed to remove completed write from queue"); } trace!(cache = %self.cache_name, ulid = %ulid, delay_ms = self.rate_limiter.current_delay().as_millis(), "Completed S3 write"); // Record metrics self.record_write_attempt("success"); self.record_write_duration(duration_ms); // Update rate limiter self.rate_limiter.on_success(); if self.rate_limiter.should_reduce_backoff() { let old_delay = self.rate_limiter.current_delay(); self.rate_limiter.reduce_backoff(); let new_delay = self.rate_limiter.current_delay(); debug!( cache = %self.cache_name, old_delay_ms = old_delay.as_millis(), new_delay_ms = new_delay.as_millis(), consecutive_successes = self.rate_limiter.consecutive_successes(), "S3 backoff reduced after successes" ); } } async fn handle_error(&mut self, ulid: Ulid, error: &LayerDbError) { // Extract S3Error if this is an S3 error let s3_error = match error { LayerDbError::S3(boxed_error) => boxed_error.as_ref(), _ => { // Non-S3 errors are treated as transient warn!( cache = %self.cache_name, ulid = %ulid, error = %error, "S3 write failed with non-S3 error, will retry" ); self.rate_limiter.on_throttle(); return; } }; // Classify based on S3Error variant match s3_error { S3Error::Throttling { .. } => { self.record_write_attempt("throttle"); let old_delay = self.rate_limiter.current_delay(); self.rate_limiter.on_throttle(); let new_delay = self.rate_limiter.current_delay(); debug!( cache = %self.cache_name, ulid = %ulid, old_delay_ms = old_delay.as_millis(), new_delay_ms = new_delay.as_millis(), "S3 rate limited, increasing backoff" ); // Leave in queue, will retry } S3Error::Configuration { message, .. } => { self.record_write_attempt("error_configuration"); // Configuration errors are non-retryable, similar to serialization errors error!( cache = %self.cache_name, ulid = %ulid, error = %error, "S3 write failed: configuration error, moving to DLQ" ); if let Err(e) = self.queue.move_to_dlq( ulid, &S3WriteQueueError::Configuration { message: message.clone(), }, ) { error!(cache = %self.cache_name, ulid = %ulid, dlq_error = %e, "Failed to move corrupted write to DLQ"); } } _ => { self.record_write_attempt("error_transient"); // All other errors (Network, Authentication, NotFound, Other) are transient warn!( cache = %self.cache_name, ulid = %ulid, error = %error, "S3 write failed with transient error, will retry" ); // Treat like throttle - increase backoff self.rate_limiter.on_throttle(); // Leave in queue, will retry } } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server