Skip to main content
Glama
error.rs11.5 kB
use std::{ error, num::TryFromIntError, }; use aws_sdk_s3::{ error::SdkError, operation::{ get_object::GetObjectError, head_bucket::HeadBucketError, put_object::PutObjectError, }, }; use si_data_nats::async_nats::jetstream; use si_data_pg::{ PgError, PgPoolError, }; use si_events::{ ActionId, FuncRunId, content_hash::ContentHashParseError, }; use si_std::CanonicalFileError; use thiserror::Error; use tokio_stream::Elapsed; use crate::{ activities::{ Activity, ActivityId, }, event::LayeredEvent, persister::{ PersistMessage, PersisterTaskError, }, }; /// S3 operation that failed #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum S3Operation { /// Read operation (get_object) Get, /// Write operation (put_object) Put, /// Bucket existence check (head_bucket) HeadBucket, } impl std::fmt::Display for S3Operation { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { S3Operation::Get => write!(f, "Get"), S3Operation::Put => write!(f, "Put"), S3Operation::HeadBucket => write!(f, "HeadBucket"), } } } /// Wrapper for concrete AWS SDK error types #[derive(Debug)] pub enum AwsSdkError { /// Error from put_object operation PutObject(SdkError<PutObjectError>), /// Error from get_object operation GetObject(SdkError<GetObjectError>), /// Error from head_bucket operation HeadBucket(SdkError<HeadBucketError>), } impl std::fmt::Display for AwsSdkError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { AwsSdkError::PutObject(e) => write!(f, "{e}"), AwsSdkError::GetObject(e) => write!(f, "{e}"), AwsSdkError::HeadBucket(e) => write!(f, "{e}"), } } } impl std::error::Error for AwsSdkError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { AwsSdkError::PutObject(e) => Some(e), AwsSdkError::GetObject(e) => Some(e), AwsSdkError::HeadBucket(e) => Some(e), } } } /// Structured S3 error with categorization and context #[derive(Error, Debug)] pub enum S3Error { /// Authentication or authorization failure (403, AccessDenied, InvalidAccessKeyId) #[error( "S3 authentication failed for {operation} (cache: {cache_name}, key: {key}): {message}" )] Authentication { operation: S3Operation, cache_name: String, key: String, message: String, #[source] source: AwsSdkError, }, /// Resource not found (404, NoSuchBucket, NoSuchKey) #[error("S3 resource not found for {operation} (cache: {cache_name}, key: {key}): {message}")] NotFound { operation: S3Operation, cache_name: String, key: String, message: String, #[source] source: AwsSdkError, }, /// Rate limiting or throttling (503, SlowDown, RequestLimitExceeded) #[error("S3 throttled for {operation} (cache: {cache_name}, key: {key}): {message}")] Throttling { operation: S3Operation, cache_name: String, key: String, message: String, #[source] source: AwsSdkError, }, /// Network or connection errors (timeout, dispatch failure) #[error("S3 network error for {operation} (cache: {cache_name}, key: {key}): {message}")] Network { operation: S3Operation, cache_name: String, key: String, message: String, #[source] source: AwsSdkError, }, /// Configuration errors (construction failure, invalid parameters) #[error("S3 configuration error for {operation} (cache: {cache_name}, key: {key}): {message}")] Configuration { operation: S3Operation, cache_name: String, key: String, message: String, #[source] source: AwsSdkError, }, /// Uncategorized errors #[error("S3 error for {operation} (cache: {cache_name}, key: {key}): {message}")] Other { operation: S3Operation, cache_name: String, key: String, message: String, #[source] source: AwsSdkError, }, } impl S3Error { /// Get the error kind as a string for metrics/logging pub fn kind(&self) -> &'static str { match self { S3Error::Authentication { .. } => "authentication", S3Error::NotFound { .. } => "not_found", S3Error::Throttling { .. } => "throttling", S3Error::Network { .. } => "network", S3Error::Configuration { .. } => "configuration", S3Error::Other { .. } => "other", } } /// Get the key from any error variant pub fn key(&self) -> &str { match self { S3Error::Authentication { key, .. } | S3Error::NotFound { key, .. } | S3Error::Throttling { key, .. } | S3Error::Network { key, .. } | S3Error::Configuration { key, .. } | S3Error::Other { key, .. } => key, } } /// Get the cache name from any error variant pub fn cache_name(&self) -> &str { match self { S3Error::Authentication { cache_name, .. } | S3Error::NotFound { cache_name, .. } | S3Error::Throttling { cache_name, .. } | S3Error::Network { cache_name, .. } | S3Error::Configuration { cache_name, .. } | S3Error::Other { cache_name, .. } => cache_name, } } /// Get the operation from any error variant pub fn operation(&self) -> S3Operation { match self { S3Error::Authentication { operation, .. } | S3Error::NotFound { operation, .. } | S3Error::Throttling { operation, .. } | S3Error::Network { operation, .. } | S3Error::Configuration { operation, .. } | S3Error::Other { operation, .. } => *operation, } } } #[remain::sorted] #[derive(Error, Debug)] pub enum LayerDbError { #[error("attempted to find a bunch by action id, but there wasn't one")] ActionIdNotFound(ActionId), #[error("Activity is not an activity rebase, and should be to be on the work queue")] ActivityRebase, #[error("Activity Event Server send error: {0}")] ActivitySend(#[from] Box<tokio::sync::mpsc::error::SendError<Activity>>), #[error( "While waiting for an activity id {0}, all senders have closed. The activity will never arrive." )] ActivityWaitClosed(ActivityId), #[error("While waiting for an activity id {0}, the receiving stream has lagged. Cancelling.")] ActivityWaitLagged(ActivityId), #[error("Timed out waiting for activity id {0} after {1}")] ActivityWaitTimeout(ActivityId, Elapsed), #[error("AWS config error: {0}")] AwsConfig(#[from] si_aws_config::AwsConfigError), #[error("cache update message with bad headers: {0}")] CacheUpdateBadHeaders(String), #[error("cache update message had no headers")] CacheUpdateNoHeaders, #[error("canonical file error: {0}")] CanonicalFile(#[from] CanonicalFileError), #[error("Configuration validation error: {0}")] ConfigValidation(String), #[error("content conversion error: {0}")] ContentConversion(String), #[error("could not convert to key from string")] CouldNotConvertToKeyFromString(String), #[error("decompression error: {0}")] Decompress(String), #[error("Foyer error: {0}")] Foyer(#[source] Box<dyn error::Error + Sync + Send + 'static>), #[error("failed to parse content hash from str: {0}")] HashParse(#[from] ContentHashParseError), #[error("incomplete key: {0}")] IncompleteKey(String), #[error("failed to convert integer: {0}")] IntConvert(#[from] TryFromIntError), #[error("io error: {0}")] Io(#[from] std::io::Error), #[error("join error: {0}")] JoinError(#[from] tokio::task::JoinError), #[error("Layered Event Server send error: {0}")] LayeredEventSend(#[from] Box<tokio::sync::mpsc::error::SendError<LayeredEvent>>), #[error("missing func_run when one was expected: {0}")] MissingFuncRun(FuncRunId), #[error("missing internal buffer entry when expected; this is an internal bug")] MissingInternalBuffer, #[error("ack error: {0}")] NatsAck(#[source] si_data_nats::async_nats::Error), #[error("raw ack error: {0}")] NatsAckRaw(String), #[error("nats client: {0}")] NatsClient(#[from] si_data_nats::Error), #[error("stream consumer error: {0}")] NatsConsumer(#[from] jetstream::stream::ConsumerError), #[error("error while fetching or creating a nats jetsream: {0}")] NatsCreateStream(#[from] jetstream::context::CreateStreamError), #[error("error parsing nats message header: {0}")] NatsHeaderParse(#[source] Box<dyn error::Error + Send + Sync + 'static>), #[error("malformed/missing nats headers")] NatsMalformedHeaders, #[error("nats message missing size header")] NatsMissingSizeHeader, #[error("error publishing message: {0}")] NatsPublish(#[from] jetstream::context::PublishError), #[error("error pull message from stream: {0}")] NatsPullMessages(#[from] jetstream::consumer::pull::MessagesError), #[error("consumer stream error: {0}")] NatsStream(#[from] jetstream::consumer::StreamError), #[error("persister task write failed: {0:?}")] PersisterTaskFailed(PersisterTaskError), #[error("persister write error: {0}")] PersisterWriteSend(#[from] Box<tokio::sync::mpsc::error::SendError<PersistMessage>>), #[error("pg error: {0}]")] Pg(#[from] PgError), #[error("pg pool error: {0}]")] PgPool(#[from] PgPoolError), #[error("postcard error: {0}")] Postcard(#[from] postcard::Error), #[error("failed to create retry queue directory: {0}")] RetryQueueDirCreate(#[source] std::io::Error), #[error("failed to read retry queue directory: {0}")] RetryQueueDirRead(#[source] std::io::Error), #[error("failed to delete retry queue file: {0}")] RetryQueueFileDelete(#[source] std::io::Error), #[error("failed to read retry queue file: {0}")] RetryQueueFileRead(#[source] std::io::Error), #[error("failed to write retry queue file: {0}")] RetryQueueFileWrite(#[source] std::io::Error), #[error("invalid retry queue filename: {0:?}")] RetryQueueInvalidFilename(std::ffi::OsString), #[error("retry queue send error: {0}")] RetryQueueSend(String), #[error("S3 error: {0}")] S3(Box<S3Error>), #[error("S3 disk store error: {0}")] S3DiskStore(String), #[error("S3 not configured")] S3NotConfigured, #[error("S3 queue processor error: {0}")] S3QueueProcessor(String), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("tokio oneshot recv error: {0}")] TokioOneShotRecv(#[from] tokio::sync::oneshot::error::RecvError), #[error("unexpected activity variant; expected={0}, actual={1}")] UnexpectedActivityVariant(String, String), } impl LayerDbError { pub fn nats_header_parse<E>(err: E) -> Self where E: error::Error + Send + Sync + 'static, { Self::NatsHeaderParse(Box::new(err)) } } pub type LayerDbResult<T> = Result<T, LayerDbError>;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server