Skip to main content
Glama
consumer.rs9.03 kB
use std::time::Duration; use async_trait::async_trait; use pinga_core::api_types::job_execution_request::JobArgsVCurrent; use rand::Rng; use serde_json::Value; use si_data_nats::NatsError; use si_data_pg::PgPoolError; use si_db::HistoryActor; use si_events::authentication_method::AuthenticationMethodV1; use si_id::{ ChangeSetId, WorkspacePk, }; use thiserror::Error; use tokio::task::JoinError; use crate::{ AccessBuilder, ActionPrototypeId, ChangeSetError, ComponentError, ComponentId, DalContext, DalContextBuilder, FuncError, TransactionsError, WorkspaceSnapshotError, WsEventError, action::{ ActionError, prototype::ActionPrototypeError, }, attribute::value::AttributeValueError, billing_publish::BillingPublishError, diagram::DiagramError, func::{ debug::DebugFuncError, runner::FuncRunnerError, }, job::{ definition::{ ManagementFuncJobError, dependent_values_update::DependentValueUpdateError, }, producer::BlockingJobError, }, prop::PropError, validation::ValidationError, }; #[remain::sorted] #[derive(Error, Debug)] #[allow(clippy::large_enum_variant)] pub enum JobConsumerError { #[error("action error: {0}")] Action(#[from] Box<ActionError>), #[error("action prototype error: {0}")] ActionPrototype(#[from] Box<ActionPrototypeError>), #[error("ActionProtoype {0} not found")] ActionPrototypeNotFound(ActionPrototypeId), #[error("attribute value error: {0}")] AttributeValue(#[from] Box<AttributeValueError>), #[error("billing publish error: {0}")] BillingPublish(#[from] BillingPublishError), #[error("Error blocking on job: {0}")] BlockingJob(#[from] Box<BlockingJobError>), #[error("change set error: {0}")] ChangeSet(#[from] Box<ChangeSetError>), #[error("component error: {0}")] Component(#[from] Box<ComponentError>), #[error("component {0} is destroyed")] ComponentIsDestroyed(ComponentId), #[error("debug func error: {0}")] DebugFunc(#[from] Box<DebugFuncError>), #[error("dependent value update error: {0}")] DependentValueUpdate(#[from] Box<DependentValueUpdateError>), #[error("diagram error: {0}")] Diagram(#[from] Box<DiagramError>), #[error("func error: {0}")] Func(#[from] Box<FuncError>), #[error("func runner error: {0}")] FuncRunner(#[from] Box<FuncRunnerError>), #[error("Invalid job arguments. Expected: {0} Actual: {1:?}")] InvalidArguments(String, Vec<Value>), #[error("std io error: {0}")] Io(#[from] ::std::io::Error), #[error("management function error: {0}")] ManagementFunc(#[from] Box<ManagementFuncJobError>), #[error("nats error: {0}")] Nats(#[from] NatsError), #[error("nats is unavailable")] NatsUnavailable, #[error("pg pool error: {0}")] PgPool(#[from] PgPoolError), #[error("prop error: {0}")] Prop(#[from] Box<PropError>), #[error("execution of job {0} failed after {1} retry attempts")] RetriesFailed(JobArgsVCurrent, u32), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("tokio task error: {0}")] TokioTask(#[from] JoinError), #[error("transactions error: {0}")] Transactions(#[from] Box<TransactionsError>), #[error("ulid decode error: {0}")] UlidDecode(#[from] ulid::DecodeError), #[error("validation error: {0}")] Validation(#[from] Box<ValidationError>), #[error("workspace snapshot error: {0}")] WorkspaceSnapshot(#[from] Box<WorkspaceSnapshotError>), #[error("ws event error: {0}")] WsEvent(#[from] Box<WsEventError>), } impl From<JobConsumerError> for std::io::Error { fn from(jce: JobConsumerError) -> Self { Self::new(std::io::ErrorKind::InvalidData, jce) } } impl From<ActionError> for JobConsumerError { fn from(value: ActionError) -> Self { Box::new(value).into() } } impl From<AttributeValueError> for JobConsumerError { fn from(value: AttributeValueError) -> Self { Box::new(value).into() } } impl From<ComponentError> for JobConsumerError { fn from(value: ComponentError) -> Self { Box::new(value).into() } } impl From<DiagramError> for JobConsumerError { fn from(value: DiagramError) -> Self { Box::new(value).into() } } impl From<FuncError> for JobConsumerError { fn from(value: FuncError) -> Self { Box::new(value).into() } } impl From<FuncRunnerError> for JobConsumerError { fn from(value: FuncRunnerError) -> Self { Box::new(value).into() } } impl From<WorkspaceSnapshotError> for JobConsumerError { fn from(value: WorkspaceSnapshotError) -> Self { Box::new(value).into() } } impl From<ActionPrototypeError> for JobConsumerError { fn from(value: ActionPrototypeError) -> Self { Box::new(value).into() } } impl From<BlockingJobError> for JobConsumerError { fn from(value: BlockingJobError) -> Self { Box::new(value).into() } } impl From<ChangeSetError> for JobConsumerError { fn from(value: ChangeSetError) -> Self { Box::new(value).into() } } impl From<DependentValueUpdateError> for JobConsumerError { fn from(value: DependentValueUpdateError) -> Self { Box::new(value).into() } } impl From<ManagementFuncJobError> for JobConsumerError { fn from(value: ManagementFuncJobError) -> Self { Box::new(value).into() } } impl From<PropError> for JobConsumerError { fn from(value: PropError) -> Self { Box::new(value).into() } } impl From<TransactionsError> for JobConsumerError { fn from(value: TransactionsError) -> Self { Box::new(value).into() } } impl From<ValidationError> for JobConsumerError { fn from(value: ValidationError) -> Self { Box::new(value).into() } } impl From<WsEventError> for JobConsumerError { fn from(value: WsEventError) -> Self { Box::new(value).into() } } pub type JobConsumerResult<T> = Result<T, JobConsumerError>; pub enum RetryBackoff { Exponential, None, } /// Jobs that return a state of `JobCompletionState::Retry` will be retried /// with the requested backoff and limit pub enum JobCompletionState { Retry { /// Maximum number of retries limit: u32, /// Retry backoff strategy backoff: RetryBackoff, /// Minimum sleep ms for retry minimum: u32, }, Done, } #[async_trait] pub trait DalJob: std::fmt::Debug + Sync + Send { fn args(&self) -> JobArgsVCurrent; fn workspace_id(&self) -> WorkspacePk; fn change_set_id(&self) -> ChangeSetId; } #[async_trait] // Having Sync as a supertrait gets around triggering https://github.com/rust-lang/rust/issues/51443 pub trait JobConsumer: std::fmt::Debug + Sync + DalJob { /// Intended to be defined by implementations of this trait. async fn run(&self, ctx: &mut DalContext) -> JobConsumerResult<JobCompletionState>; /// Called on the trait object to set up the data necessary to run the job, /// and in-turn calls the `run` method. Can be overridden by an implementation /// of the trait if you need more control over how the `DalContext` is managed /// during the lifetime of the job. async fn run_job(&self, ctx_builder: DalContextBuilder) -> JobConsumerResult<()> { let request_context = AccessBuilder::new( self.workspace_id().into(), HistoryActor::SystemInit, None, AuthenticationMethodV1::System, ) .build(self.change_set_id().into()); let mut retries = 0; loop { let mut ctx = ctx_builder.build(request_context.clone()).await?; match self.run(&mut ctx).await? { JobCompletionState::Retry { limit, minimum, backoff, } => { if retries >= limit { return Err(JobConsumerError::RetriesFailed(self.args(), retries)); } if let RetryBackoff::Exponential = backoff { tokio::time::sleep(calculate_exponential_sleep_ms(retries, 2, minimum)) .await; }; } JobCompletionState::Done => { break; } } retries = retries.saturating_add(1); } Ok(()) } } fn calculate_exponential_sleep_ms(retry_no: u32, base: u32, minimum: u32) -> Duration { let sleep_micros = base.pow(retry_no).saturating_mul(minimum); let mut rng = rand::thread_rng(); // "full" jitter, to prevent "thundering herd". On average this still gives // us an exponential distribution let jittered_micros = rng.gen_range(minimum..=sleep_micros); Duration::from_micros(jittered_micros.into()) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server