Skip to main content
Glama
s3.rs31.4 kB
use std::{ collections::HashMap, fmt, path::Path, sync::Arc, }; use aws_config::Region; use aws_credential_types::Credentials; use aws_sdk_s3::Client; use serde::{ Deserialize, Serialize, }; use si_std::SensitiveString; use strum::AsRefStr; use telemetry::prelude::*; use tokio::task::JoinHandle; use crate::{ error::{ LayerDbError, LayerDbResult, }, event::{ LayeredEvent, LayeredEventPayload, }, rate_limiter::RateLimitConfig, s3_queue_processor::S3QueueProcessor, s3_write_queue::S3WriteQueue, }; /// Strategy for transforming keys before storage in S3 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum KeyTransformStrategy { /// No transformation - use key as-is /// Use for content-addressable keys (hashes) that are already well-distributed Passthrough, /// Reverse the entire key string for even distribution /// Use for ULID-based keys where timestamp prefix causes hotspotting ReverseKey, } impl KeyTransformStrategy { /// Apply transformation to a key pub fn transform(&self, key: &str) -> String { match self { Self::Passthrough => key.to_string(), Self::ReverseKey => key.chars().rev().collect(), } } } impl Default for KeyTransformStrategy { fn default() -> Self { Self::Passthrough } } /// S3 authentication configuration #[derive(Clone, AsRefStr, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum S3AuthConfig { /// Static credentials (for local dev, MinIO, etc.) StaticCredentials { access_key: SensitiveString, secret_key: SensitiveString, }, /// IAM role-based authentication (for production AWS) /// Uses AWS SDK's default credential provider chain IamRole, } impl fmt::Debug for S3AuthConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(self.as_ref()) } } /// Resolved S3 configuration for a specific cache #[derive(Debug, Clone)] pub struct S3CacheConfig { /// S3 endpoint URL (e.g., `http://localhost:9200` or `https://s3.us-west-2.amazonaws.com`) pub endpoint: String, /// Complete bucket name for this cache pub bucket_name: String, /// AWS region (e.g., `us-east-1`) pub region: String, /// Authentication method pub auth: S3AuthConfig, /// Optional key prefix for test isolation pub key_prefix: Option<String>, } /// Configuration for S3 read operation retry behavior /// /// Controls AWS SDK retry configuration for S3Layer read operations (get, head_bucket). /// Write operations use application-level retry via queue and have SDK retry disabled. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct S3ReadRetryConfig { /// Maximum number of retry attempts (default: 3) pub max_attempts: u32, /// Initial backoff delay in milliseconds (default: 100) pub initial_backoff_ms: u64, /// Maximum backoff delay in milliseconds (default: 20000) pub max_backoff_ms: u64, /// Backoff multiplier for exponential backoff (default: 2.0) pub backoff_multiplier: f64, } impl Default for S3ReadRetryConfig { fn default() -> Self { Self { max_attempts: 3, initial_backoff_ms: 100, max_backoff_ms: 20000, backoff_multiplier: 2.0, } } } /// Configuration for S3-compatible object storage. /// /// Supports AWS S3 and S3-compatible services (VersityGW, MinIO, etc). /// /// Each cache gets its own bucket: `{bucket_prefix}-{cache_name}[-{bucket_suffix}]`. /// Cache names with underscores are converted to hyphens for DNS compliance. /// /// # Test Isolation /// /// The `key_prefix` field provides namespace isolation for concurrent tests. /// When set, all object keys are prefixed with this value after key transformation /// and distribution prefixing. This mirrors the NATS `subject_prefix` pattern. /// /// Example key transformation with prefix: /// - Original key: `"abc123def456"` /// - After Passthrough: `"abc123def456"` /// - After three-tier: `"ab/c1/23/abc123def456"` /// - After test prefix: `"test-uuid-1234/ab/c1/23/abc123def456"` #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ObjectStorageConfig { /// S3 endpoint URL (e.g., `http://localhost:9200` or `https://s3.us-west-2.amazonaws.com`) pub endpoint: String, /// Bucket prefix for all caches (e.g., `si-layer-cache`) pub bucket_prefix: String, /// Optional suffix appended to final bucket name (e.g., `production`, `tools`) /// Final bucket: `{bucket_prefix}-{cache_name}[-{bucket_suffix}]` pub bucket_suffix: Option<String>, /// AWS region (e.g., `us-east-1`) pub region: String, /// Authentication method pub auth: S3AuthConfig, /// Optional key prefix for test isolation /// When set, all object keys are prefixed with this value after transformation pub key_prefix: Option<String>, /// Rate limiting configuration for S3 writes #[serde(default)] pub rate_limit: RateLimitConfig, /// Retry configuration for S3 read operations #[serde(default)] pub read_retry: S3ReadRetryConfig, } impl Default for ObjectStorageConfig { fn default() -> Self { Self { endpoint: "http://localhost:9200".to_string(), bucket_prefix: "si-layer-cache".to_string(), bucket_suffix: None, region: "us-east-1".to_string(), auth: S3AuthConfig::StaticCredentials { access_key: "siadmin".into(), secret_key: "bugbear".into(), }, key_prefix: None, rate_limit: RateLimitConfig::default(), read_retry: S3ReadRetryConfig::default(), } } } impl ObjectStorageConfig { /// Create cache-specific configuration with resolved bucket name /// /// Normalizes cache name for DNS-compliant bucket naming. /// Final bucket name: `{bucket_prefix}-{cache_name}[-{bucket_suffix}]` pub fn for_cache(&self, cache_name: &str) -> S3CacheConfig { // S3 bucket names: lowercase, hyphens allowed, no underscores let normalized_name = cache_name.replace('_', "-"); let bucket_name = match &self.bucket_suffix { Some(suffix) => format!("{}-{}-{}", self.bucket_prefix, normalized_name, suffix), None => format!("{}-{}", self.bucket_prefix, normalized_name), }; S3CacheConfig { endpoint: self.endpoint.clone(), bucket_name, region: self.region.clone(), auth: self.auth.clone(), key_prefix: self.key_prefix.clone(), } } } /// S3-based persistence layer with internal write queue and adaptive rate limiting. /// /// # Architecture /// /// All writes go through a persistent disk queue (no fast path) to guarantee durability: /// /// 1. `insert()` transforms the key and enqueues LayeredEvent to disk atomically /// 2. Background processor dequeues in ULID order /// 3. Processor applies backoff delay if rate limited /// 4. Processor attempts S3 write /// 5. On success: remove from queue, update rate limiter /// 6. On throttle (503): increase backoff, leave in queue /// 7. On serialization error: move to dead letter queue /// 8. On transient error: increase backoff, leave in queue /// /// # S3 Clients /// /// Two independent S3 clients with different retry configurations: /// /// - **S3Layer client**: Configurable SDK retry for resilient synchronous reads /// - **Processor client**: SDK retry disabled (application-level retry via queue) /// /// This separation ensures processor issues don't affect S3Layer reads, and allows /// optimal retry strategies for each use case. /// /// # Key Transformation /// /// Keys are transformed at the API boundary before queueing: /// /// - Strategy transformation (Passthrough or ReverseKey) /// - Three-tier distribution prefix (`ab/c1/23/...`) /// - Optional key_prefix if configured /// /// Events in the queue contain pre-transformed keys ready for S3 storage. /// The processor uses keys directly without transformation. /// /// # Rate Limiting /// /// Adaptive exponential backoff prevents constant S3 throttling: /// /// - **On throttle:** delay *= backoff_multiplier (default: 2.0), capped at max_delay /// - **On success:** consecutive_successes++ /// - **After N successes:** delay /= success_divisor (default: 1.5), streak resets /// - **Below Zeno threshold (50ms):** delay resets to zero (solves dichotomy paradox) /// /// # Configuration /// /// Rate limiting configured via `RateLimitConfig` in `ObjectStorageConfig`: /// /// ```json /// { /// "rate_limit": { /// "min_delay_ms": 0, /// "max_delay_ms": 5000, /// "initial_backoff_ms": 100, /// "backoff_multiplier": 2.0, /// "success_divisor": null, // defaults to multiplier * 0.75 /// "zeno_threshold_ms": 50, /// "successes_before_reduction": 3 /// } /// } /// ``` /// /// # Logging /// /// - Enqueue: TRACE level (normal operation) /// - Success: TRACE level (expected happy path) /// - Throttle: DEBUG level (expected, not alarming) /// - Transient error: WARN level (unexpected but retryable) /// - Serialization error: ERROR level (data corruption) /// /// # Metrics /// /// - `s3_write_queue_depth` - Current pending writes /// - `s3_write_backoff_ms` - Current backoff delay /// - `s3_write_attempts_total{result}` - Attempts by result (success/throttle/error) /// - `s3_write_duration_ms` - Time from enqueue to completion /// /// # Shutdown /// /// On Drop: /// 1. Signals processor to shutdown /// 2. Processor completes in-flight write /// 3. Processor exits (does not drain queue) /// 4. Queue remains on disk for restart /// /// # Startup /// /// On initialization: /// 1. Scans queue directory for `*.pending` files /// 2. Loads pending writes in ULID order /// 3. Starts processor with zero backoff /// 4. Rate limits rediscovered naturally during processing #[derive(Clone, Debug)] pub struct S3Layer { client: Client, bucket_name: String, cache_name: String, strategy: KeyTransformStrategy, key_prefix: Option<String>, write_queue: Arc<S3WriteQueue>, processor_handle: Arc<JoinHandle<()>>, processor_shutdown: Arc<tokio::sync::Notify>, } impl S3Layer { /// Create a new S3Layer from configuration and strategy /// /// All writes go through a persistent queue for durability. The queue processor /// runs in the background and applies adaptive rate limiting if configured. /// /// # Parameters /// /// * `config` - S3 cache configuration /// * `cache_name` - Name of this cache (used for queue directory) /// * `strategy` - Key transformation strategy /// * `rate_limit_config` - Rate limiting configuration for adaptive backoff /// * `read_retry_config` - Retry configuration for S3 read operations /// * `queue_base_path` - Base directory for queue persistence pub async fn new( config: S3CacheConfig, cache_name: impl Into<String>, strategy: KeyTransformStrategy, rate_limit_config: RateLimitConfig, read_retry_config: S3ReadRetryConfig, queue_base_path: impl AsRef<Path>, ) -> LayerDbResult<Self> { info!( layer_db.s3.auth_mode = config.auth.as_ref(), layer_db.s3.bucket_name = config.bucket_name, "Creating S3 layer", ); let sdk_config = match &config.auth { S3AuthConfig::StaticCredentials { access_key, secret_key, } => { // Static credential flow for dev/MinIO let credentials = Credentials::new( access_key.as_str(), secret_key.as_str(), None, // session token None, // expiration "static", // provider name ); info!(endpoint = config.endpoint, "Using S3 endpoint",); aws_config::SdkConfig::builder() .endpoint_url(&config.endpoint) .region(Region::new(config.region.clone())) .credentials_provider( aws_credential_types::provider::SharedCredentialsProvider::new(credentials), ) .behavior_version(aws_config::BehaviorVersion::latest()) .build() } S3AuthConfig::IamRole => { // Use si-aws-config which properly loads credentials, adds retry config, and validates via STS si_aws_config::AwsConfig::from_env() .await .map_err(LayerDbError::AwsConfig)? } }; // VersityGW with POSIX backend requires path-style bucket access let s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config).force_path_style(true); // Apply read retry configuration let retry_config = aws_sdk_s3::config::retry::RetryConfig::standard() .with_max_attempts(read_retry_config.max_attempts) .with_initial_backoff(std::time::Duration::from_millis( read_retry_config.initial_backoff_ms, )) .with_max_backoff(std::time::Duration::from_millis( read_retry_config.max_backoff_ms, )); let s3_config = s3_config_builder.retry_config(retry_config).build(); let client = Client::from_conf(s3_config); // Create separate S3 client for processor with retry disabled // Processor uses application-level retry via queue, so SDK retry must be disabled let processor_s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) .force_path_style(true) .retry_config(aws_sdk_s3::config::retry::RetryConfig::disabled()) .build(); let processor_client = Client::from_conf(processor_s3_config); let cache_name_str = cache_name.into(); let bucket_name = config.bucket_name; let key_prefix = config.key_prefix; // Initialize write queue let write_queue = S3WriteQueue::new(&queue_base_path, &cache_name_str) .map_err(|e| LayerDbError::S3WriteQueue(e.to_string()))?; let write_queue = Arc::new(write_queue); // Start queue processor // Create processor with direct S3 client access (no S3Layer dependency) let processor = S3QueueProcessor::new( Arc::clone(&write_queue), rate_limit_config, processor_client, bucket_name.clone(), cache_name_str.clone(), ); let shutdown = processor.shutdown_handle(); let handle = tokio::spawn(processor.process_queue()); let processor_handle = Arc::new(handle); let processor_shutdown = shutdown; Ok(S3Layer { client, bucket_name, cache_name: cache_name_str, strategy, key_prefix, write_queue, processor_handle, processor_shutdown, }) } /// Get the key transform strategy used by this S3Layer pub fn strategy(&self) -> KeyTransformStrategy { self.strategy } /// Transform and apply three-tier prefix to key, with optional test prefix /// /// Transformation order: /// 1. Apply strategy transformation (e.g., ReverseKey) /// 2. Apply three-tier distribution prefix (ab/c1/23/...) /// 3. Apply test key prefix if present (test-uuid/ab/c1/23/...) /// /// Example with Passthrough: /// "abc123..." -> "ab/c1/23/abc123..." /// Example with Passthrough + test prefix: /// "abc123..." -> "test-uuid-1234/ab/c1/23/abc123..." /// Example with ReverseKey + test prefix: /// "abc123..." -> reverse -> "...321cba" -> "../.3/21/...321cba" -> "test-uuid-1234/../.3/21/...321cba" fn transform_and_prefix_key(&self, key: &str) -> String { // Step 1: Apply strategy transformation (e.g., reverse) let transformed = self.strategy.transform(key); // Step 2: Apply three-tier prefixing for S3 distribution // Takes first 6 chars and creates: XX/YY/ZZ/full_key let with_distribution_prefix = if transformed.len() >= 6 { format!( "{}/{}/{}/{}", &transformed[..2], &transformed[2..4], &transformed[4..6], transformed ) } else if transformed.len() >= 4 { format!( "{}/{}/{}", &transformed[..2], &transformed[2..4], transformed ) } else if transformed.len() >= 2 { format!("{}/{}", &transformed[..2], transformed) } else { transformed }; // Step 3: Apply test key prefix if present (outermost prefix) match &self.key_prefix { Some(prefix) => format!("{prefix}/{with_distribution_prefix}"), None => with_distribution_prefix, } } /// Get a value by key from S3 pub async fn get(&self, key: &str) -> LayerDbResult<Option<Vec<u8>>> { use std::time::Instant; use aws_sdk_s3::{ error::SdkError, operation::get_object::GetObjectError, }; use telemetry_utils::histogram; use crate::error::AwsSdkError; let start = Instant::now(); let s3_key = self.transform_and_prefix_key(key); match self .client .get_object() .bucket(&self.bucket_name) .key(s3_key) .send() .await { Ok(output) => { let bytes = output .body .collect() .await .map_err(|e| { // TODO: Body collection errors could be structured in the future // For now, these are rare (occur after successful HTTP response) and // ByteStreamError is not an SDK operation error, so we use a simple format LayerDbError::ContentConversion(format!( "Failed to collect S3 response body for key '{key}' in cache '{}': {e}", self.cache_name )) })? .to_vec(); histogram!( layer_cache.read_latency_ms = start.elapsed().as_millis() as f64, cache_name = self.cache_name.as_str(), backend = "s3", result = "hit" ); Ok(Some(bytes)) } Err(sdk_err) => { // Check for NoSuchKey error - return None instead of error if let SdkError::ServiceError(err) = &sdk_err { if matches!(err.err(), GetObjectError::NoSuchKey(_)) { histogram!( layer_cache.read_latency_ms = start.elapsed().as_millis() as f64, cache_name = self.cache_name.as_str(), backend = "s3", result = "miss" ); return Ok(None); } } // Also check error string for compatibility with different S3 implementations let error_str = sdk_err.to_string(); if error_str.contains("NoSuchKey") || error_str.contains("404") || error_str.contains("Not Found") { histogram!( layer_cache.read_latency_ms = start.elapsed().as_millis() as f64, cache_name = self.cache_name.as_str(), backend = "s3", result = "miss" ); return Ok(None); } // Not a "not found" error - wrap with context let aws_error = AwsSdkError::GetObject(sdk_err); let s3_error = categorize_s3_error( aws_error, crate::error::S3Operation::Get, self.cache_name.clone(), key.to_string(), ); Err(LayerDbError::S3(Box::new(s3_error))) } } } /// Insert an event into S3 via the write queue /// /// Transforms the key according to the configured strategy and prefix before queueing. /// The queued event contains the final S3 key ready to write. /// /// This is the single write interface - all S3 writes go through the queue for durability. pub fn insert(&self, event: &LayeredEvent) -> LayerDbResult<()> { // Transform key at API boundary let transformed_key = self.transform_and_prefix_key(&event.key); let transformed_key_arc: Arc<str> = Arc::from(transformed_key); // Create new event with transformed key // Most fields are Arc, so clone is cheap let transformed_event = LayeredEvent { event_id: event.event_id, event_kind: event.event_kind, key: transformed_key_arc.clone(), metadata: event.metadata.clone(), payload: LayeredEventPayload { db_name: event.payload.db_name.clone(), key: transformed_key_arc.clone(), // Reuse same Arc sort_key: event.payload.sort_key.clone(), value: event.payload.value.clone(), }, web_events: event.web_events.clone(), }; // Queue the transformed event let ulid = self .write_queue .enqueue(&transformed_event) .map_err(|e| LayerDbError::S3WriteQueue(e.to_string()))?; trace!( cache = %self.cache_name, ulid = %ulid, "Queued S3 write" ); Ok(()) } /// Get multiple values in parallel pub async fn get_bulk(&self, keys: &[&str]) -> LayerDbResult<HashMap<String, Vec<u8>>> { use futures::future::join_all; let futures = keys.iter().map(|&key| async move { let key_string = key.to_string(); match self.get(key).await { Ok(Some(value)) => Some((key_string, value)), Ok(None) => None, Err(_) => None, // Ignore individual errors in bulk fetch } }); let results = join_all(futures).await; Ok(results.into_iter().flatten().collect()) } /// Ensure bucket exists (no schema migrations needed for S3) pub async fn migrate(&self) -> LayerDbResult<()> { use crate::error::AwsSdkError; // Check if bucket exists - buckets should be pre-created by infrastructure // If bucket doesn't exist, this will return an error which should be treated as retryable match self .client .head_bucket() .bucket(&self.bucket_name) .send() .await { Ok(_) => Ok(()), Err(sdk_err) => { let aws_error = AwsSdkError::HeadBucket(sdk_err); let s3_error = categorize_s3_error( aws_error, crate::error::S3Operation::HeadBucket, // Use bucket name as cache name for head_bucket operations self.bucket_name.clone(), String::new(), // No specific key for head_bucket ); Err(LayerDbError::S3(Box::new(s3_error))) } } } } impl Drop for S3Layer { fn drop(&mut self) { // Signal processor to shutdown self.processor_shutdown.notify_one(); // Abort the processor task handle // Note: Can't await in Drop, processor will exit on next loop iteration self.processor_handle.abort(); } } /// Classification of S3 errors for appropriate handling #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum S3ErrorKind { /// Throttling/rate limiting error (SlowDown, RequestLimitExceeded, ServiceUnavailable) Throttle, /// Serialization/construction error (non-retryable) Serialization, /// Transient error (retryable but not throttling) Transient, } impl S3ErrorKind { /// Returns true if this is a throttling error pub fn is_throttle(&self) -> bool { matches!(self, S3ErrorKind::Throttle) } /// Returns true if this is a serialization error pub fn is_serialization(&self) -> bool { matches!(self, S3ErrorKind::Serialization) } /// Returns true if this is a transient error pub fn is_transient(&self) -> bool { matches!(self, S3ErrorKind::Transient) } } /// Categorize AWS SDK error into appropriate S3Error variant with context /// /// Used by both S3Layer (for read operations) and S3QueueProcessor (for write operations) /// to consistently classify S3 errors for retry/backoff decisions. pub fn categorize_s3_error( aws_error: crate::error::AwsSdkError, operation: crate::error::S3Operation, cache_name: String, key: String, ) -> crate::error::S3Error { use crate::error::{ AwsSdkError, S3Error, }; // Extract message from AWS error for display let message = aws_error.to_string(); // Helper to determine error category from status and message let determine_category = |status: u16, error_msg: &str| -> &'static str { if status == 403 || error_msg.contains("AccessDenied") || error_msg.contains("InvalidAccessKeyId") || error_msg.contains("SignatureDoesNotMatch") { "authentication" } else if status == 404 || error_msg.contains("NoSuchBucket") || error_msg.contains("NoSuchKey") { "not_found" } else if status == 503 || error_msg.contains("SlowDown") || error_msg.contains("RequestLimitExceeded") || error_msg.contains("ServiceUnavailable") { "throttling" } else { "other" } }; // Determine error category based on AWS SDK error type // Note: Can't use match arm `|` patterns because each variant has different inner types let category = match &aws_error { AwsSdkError::PutObject(sdk_err) => match sdk_err { aws_sdk_s3::error::SdkError::ServiceError(err) => { let status = err.raw().status().as_u16(); let error_msg = format!("{:?}", err.err()); determine_category(status, &error_msg) } aws_sdk_s3::error::SdkError::TimeoutError(_) | aws_sdk_s3::error::SdkError::DispatchFailure(_) | aws_sdk_s3::error::SdkError::ResponseError(_) => "network", aws_sdk_s3::error::SdkError::ConstructionFailure(_) => "configuration", _ => "other", }, AwsSdkError::GetObject(sdk_err) => match sdk_err { aws_sdk_s3::error::SdkError::ServiceError(err) => { let status = err.raw().status().as_u16(); let error_msg = format!("{:?}", err.err()); determine_category(status, &error_msg) } aws_sdk_s3::error::SdkError::TimeoutError(_) | aws_sdk_s3::error::SdkError::DispatchFailure(_) | aws_sdk_s3::error::SdkError::ResponseError(_) => "network", aws_sdk_s3::error::SdkError::ConstructionFailure(_) => "configuration", _ => "other", }, AwsSdkError::HeadBucket(sdk_err) => match sdk_err { aws_sdk_s3::error::SdkError::ServiceError(err) => { let status = err.raw().status().as_u16(); let error_msg = format!("{:?}", err.err()); determine_category(status, &error_msg) } aws_sdk_s3::error::SdkError::TimeoutError(_) | aws_sdk_s3::error::SdkError::DispatchFailure(_) | aws_sdk_s3::error::SdkError::ResponseError(_) => "network", aws_sdk_s3::error::SdkError::ConstructionFailure(_) => "configuration", _ => "other", }, }; // Build appropriate S3Error variant based on category match category { "authentication" => S3Error::Authentication { operation, cache_name, key, message, source: aws_error, }, "not_found" => S3Error::NotFound { operation, cache_name, key, message, source: aws_error, }, "throttling" => S3Error::Throttling { operation, cache_name, key, message, source: aws_error, }, "network" => S3Error::Network { operation, cache_name, key, message, source: aws_error, }, "configuration" => S3Error::Configuration { operation, cache_name, key, message, source: aws_error, }, _ => S3Error::Other { operation, cache_name, key, message, source: aws_error, }, } } /// Classify an S3 SDK error into an error kind for appropriate handling /// /// # Classification Rules /// /// - **Throttle**: SlowDown, RequestLimitExceeded, ServiceUnavailable (503) /// - **Serialization**: ConstructionFailure (indicates bad request data) /// - **Transient**: All other errors (network, timeout, internal errors, etc.) pub fn classify_s3_error<E>( error: &aws_sdk_s3::error::SdkError<E, http::Response<()>>, ) -> S3ErrorKind where E: std::error::Error, { use aws_sdk_s3::error::SdkError; match error { SdkError::ServiceError(context) => { // Extract HTTP status code as primary classification mechanism let status = context.raw().status().as_u16(); let error_msg = format!("{:?}", context.err()); // Classify based on HTTP status code (503) with message validation if status == 503 || error_msg.contains("SlowDown") || error_msg.contains("RequestLimitExceeded") || error_msg.contains("ServiceUnavailable") { S3ErrorKind::Throttle } else { // All other service errors are transient (InternalError, etc.) S3ErrorKind::Transient } } // AWS SDK serialization/construction errors SdkError::ConstructionFailure(_) => S3ErrorKind::Serialization, // All other errors are transient (network, timeout, etc.) SdkError::ResponseError(_) => S3ErrorKind::Transient, SdkError::TimeoutError(_) => S3ErrorKind::Transient, SdkError::DispatchFailure(_) => S3ErrorKind::Transient, _ => S3ErrorKind::Transient, } } #[cfg(test)] mod tests;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server