Skip to main content
Glama
handlers.rs9.16 kB
use std::{ result, sync::Arc, }; use naxum::{ extract::State, response::{ IntoResponse, Response, }, }; use rebaser_core::nats; use si_data_nats::{ NatsClient, Subject, async_nats::jetstream::{ consumer::{ StreamError, push, }, stream::ConsumerError, }, }; use telemetry::prelude::*; use thiserror::Error; use tokio::sync::Notify; use tokio_util::{ sync::CancellationToken, task::TaskTracker, }; use crate::{ Shutdown, app_state::AppState, change_set_processor_task::{ ChangeSetProcessorTask, ChangeSetProcessorTaskError, }, serial_dvu_task::{ SerialDvuTask, SerialDvuTaskError, }, subject::{ SubjectParseError, parse_task_subject, }, }; #[remain::sorted] #[derive(Debug, Error)] pub(crate) enum HandlerError { #[error("change set processor error: {0}")] ChangeSetProcessor(#[from] ChangeSetProcessorTaskError), #[error("change set processor unexpectedly completed without error")] ChangeSetProcessorCompleted, #[error("change set processor error on tokio join")] ChangeSetProcessorJoin, #[error("error creating per-change set consumer: {0}")] ConsumerCreate(#[source] ConsumerError), #[error("serial dvu error: {0}")] SerialDvu(#[from] SerialDvuTaskError), #[error("serial dvu unexpectedly completed without error")] SerialDvuCompleted, #[error("serial dvu error on tokio join")] SerialDvuJoin, #[error("subject parse error: {0}")] SubjectParse(#[from] SubjectParseError), #[error("error while subscribing for messages: {0}")] Subscribe(#[source] StreamError), #[error("task has remaining messages: {0}")] TaskHasMessages(String), #[error("task interupted: {0}")] TaskInterrupted(String), } type Error = HandlerError; type Result<T> = result::Result<T, HandlerError>; impl IntoResponse for HandlerError { fn into_response(self) -> Response { match self { HandlerError::SubjectParse(_) => { warn!(si.error.message = ?self, "subject parse error"); Response::default_bad_request() } // While propagated as an `Err`, a task being interupted is expected behavior and is // not an error (rather we use `Err` to ensure the task persists in the stream) HandlerError::TaskInterrupted(subject) => { debug!(subject, "task interrupted"); Response::default_service_unavailable() } _ => { error!(si.error.message = ?self, "failed to process message"); Response::default_internal_server_error() } } } } pub(crate) async fn default(State(state): State<AppState>, subject: Subject) -> Result<()> { let AppState { metadata, nats, pinga, edda, requests_stream, dead_letter_queue, ctx_builder, quiescent_period, token: server_token, server_tracker, features, } = state; let subject_prefix = nats.metadata().subject_prefix(); let subject_str = subject.as_str(); let (workspace, change_set) = parse_task_subject(subject_prefix, subject_str)?; let requests_stream_filter_subject = nats::subject::enqueue_updates_for_change_set( subject_prefix, workspace.str(), change_set.str(), ); let tracker = TaskTracker::new(); // We want to indendently control the lifecyle of our tasks let tasks_token = CancellationToken::new(); let run_dvu_notify = Arc::new(Notify::new()); let quiesced_token = CancellationToken::new(); let quiesced_notify = Arc::new(Notify::new()); let incoming = requests_stream .create_consumer(rebaser_requests_per_change_set_consumer_config( &nats, &requests_stream_filter_subject, )) .await .map_err(HandlerError::ConsumerCreate)? .messages() .await .map_err(HandlerError::Subscribe)?; let dvu_task = SerialDvuTask::create( metadata.clone(), workspace.id(), change_set.id(), pinga.clone(), run_dvu_notify.clone(), quiesced_notify.clone(), quiesced_token.clone(), tasks_token.clone(), ); let processor_task = ChangeSetProcessorTask::create( metadata.clone(), nats, requests_stream.clone(), dead_letter_queue, incoming, edda, workspace.id(), change_set.id(), ctx_builder, run_dvu_notify, quiescent_period, quiesced_notify, quiesced_token, tasks_token.clone(), server_tracker, features, ); let dvu_task_result = tracker.spawn(dvu_task.try_run()); let processor_task_result = tracker.spawn(processor_task.try_run()); tracker.close(); let result = tokio::select! { biased; // Cancellation token has fired, time to shut down _ = server_token.cancelled() => { debug!( service.instance.id = metadata.instance_id(), si.workspace.id = %workspace.str(), si.change_set.id = %change_set.str(), "received cancellation", ); // Task may not be complete but was interupted; reply `Err` to nack for task to persist // and retry to continue progress Err(Error::TaskInterrupted(subject_str.to_string())) } // Processor task completed processor_task_result_result = processor_task_result => { match processor_task_result_result { // Processor exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry Ok(Ok(())) => Err(Error::ChangeSetProcessorCompleted), // Processor exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::ChangeSetProcessor(err)), // Tokio join error on processor exit; reply `Err` to nack for task to persist and // retry Err(_join_err) => Err(Error::ChangeSetProcessorJoin), } } // Serial dvu task completed dvu_task_result_result = dvu_task_result => { match dvu_task_result_result { // A quiet period was found in the stream; reply Ok to ack and remove this task Ok(Ok(Shutdown::Quiesced)) => Ok(()), // Serial dvu exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry Ok(Ok(Shutdown::Graceful)) => Err(Error::SerialDvuCompleted), // Serial dvu exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::SerialDvu(err)), // Tokio join error on serial dvu exit; reply `Err` to nack for task to persist and // retry Err(_join_err) => Err(Error::SerialDvuJoin), } } }; tasks_token.cancel(); tracker.wait().await; // If the processor task was ended via a quiesced shutdown, then check one last time if there // are messages on the subject. This means that during the quiet period-triggered shutdown, // another message was published to our subject (such as during this handler waiting on the // serial dvu task to finish). In this case, we'll return a new `Err` variant to ensure the // task message is `nack`d and the task will be redelivered. if let Err(Error::ChangeSetProcessorCompleted) = result { match requests_stream .get_last_raw_message_by_subject(requests_stream_filter_subject.as_str()) .await { // We found a message on the subject Ok(message) => { debug!( messaging.message.id = message.sequence, messaging.destination.name = message.subject.as_str(), service.instance.id = metadata.instance_id(), si.change_set.id = %change_set.str(), si.workspace.id = %workspace.str(), "message found after graceful shutdown", ); Err(Error::TaskHasMessages( requests_stream_filter_subject.to_string(), )) } // Either there was not a message or another error with this call. Either way, we can // return the current `result` value Err(_) => result, } } else { // In all other cases, return our computed `result` value result } } fn rebaser_requests_per_change_set_consumer_config( nats: &NatsClient, filter_subject: &Subject, ) -> push::OrderedConfig { push::OrderedConfig { deliver_subject: nats.new_inbox(), filter_subject: filter_subject.to_string(), ..Default::default() } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server