Skip to main content
Glama
handlers.rs18.2 kB
use std::{ result, str::FromStr, sync::Arc, time::Instant, }; use edda_core::nats; use naxum::{ extract::State, response::{ IntoResponse, Response, }, }; use si_data_nats::{ NatsClient, Subject, async_nats::jetstream::{ consumer::{ StreamError, push, }, stream::ConsumerError, }, }; use si_events::{ ChangeSetId, WorkspacePk, }; use telemetry::prelude::*; use thiserror::Error; use tokio::sync::{ Notify, watch, }; use tokio_util::{ sync::CancellationToken, task::TaskTracker, }; use crate::{ app_state::AppState, change_set_processor_task::{ ChangeSetProcessorTask, ChangeSetProcessorTaskError, }, compressing_stream::CompressingStream, deployment_processor_task::{ DeploymentProcessorTask, DeploymentProcessorTaskError, }, }; #[remain::sorted] #[derive(Debug, Error)] pub(crate) enum HandlerError { #[error("change set processor error: {0}")] ChangeSetProcessor(#[from] ChangeSetProcessorTaskError), #[error("change set processor unexpectedly completed without error")] ChangeSetProcessorCompleted, #[error("change set processor error on tokio join")] ChangeSetProcessorJoin, #[error("error creating per-change set consumer: {0}")] ConsumerCreate(#[source] ConsumerError), #[error("deployment processor error: {0}")] DeploymentProcessor(#[from] DeploymentProcessorTaskError), #[error("failed to parse subject: subject={0}, reason={1}")] SubjectParse(String, String), #[error("error while subscribing for messages: {0}")] Subscribe(#[source] StreamError), #[error("task has remaining messages: {0}")] TaskHasMessages(String), #[error("task interupted: {0}")] TaskInterrupted(String), } type Error = HandlerError; type Result<T> = result::Result<T, HandlerError>; impl IntoResponse for HandlerError { fn into_response(self) -> Response { match self { Self::SubjectParse(_, _) => { warn!(si.error.message = ?self, "subject parse error"); Response::default_bad_request() } // While propagated as an `Err`, a task being interrupted is expected behavior and is // not an error (rather we use `Err` to ensure the task persists in the stream) Self::TaskInterrupted(subject) => { debug!(subject, "task interrupted"); Response::default_service_unavailable() } _ => { error!(si.error.message = ?self, "failed to process message"); Response::default_internal_server_error() } } } } pub(crate) async fn default(State(state): State<AppState>, subject: Subject) -> Result<()> { let subject_str = subject.as_str(); match parse_subject(state.nats.metadata().subject_prefix(), subject_str)? { ParsedSubject::Deployment => run_deployment_processor_task(state, subject_str).await, ParsedSubject::Workspace(_parsed_workspace_id) => { error!("received workspace request, but this is not currently implemented!"); Ok(()) } ParsedSubject::ChangeSet(parsed_workspace_id, parsed_change_set_id) => { run_change_set_processor_task( state, subject_str, parsed_workspace_id, parsed_change_set_id, ) .await } } } async fn run_deployment_processor_task(state: AppState, subject_str: &str) -> Result<()> { let AppState { metadata, nats, frigg, edda_updates, parallel_build_limit, requests_stream, ctx_builder, quiescent_period, token: server_token, server_tracker, } = state; let subject_prefix = nats.metadata().subject_prefix(); let requests_stream_filter_subject = nats::subject::request_for_deployment(subject_prefix); let tracker = TaskTracker::new(); // We want to independently control the lifecyle of our tasks let tasks_token = CancellationToken::new(); let quiesced_token = CancellationToken::new(); let quiesced_notify = Arc::new(Notify::new()); let (last_compressing_heartbeat_tx, last_compressing_heartbeat_rx) = watch::channel(Instant::now()); let incoming = requests_stream .create_consumer(edda_requests_per_change_set_consumer_config( &nats, &requests_stream_filter_subject, )) .await .map_err(Error::ConsumerCreate)? .messages() .await .map_err(Error::Subscribe)?; let incoming = CompressingStream::new( incoming, requests_stream.clone(), last_compressing_heartbeat_tx, ); let processor_task = DeploymentProcessorTask::create( metadata.clone(), nats, incoming, frigg, edda_updates, parallel_build_limit, ctx_builder, quiescent_period, quiesced_notify.clone(), quiesced_token.clone(), last_compressing_heartbeat_rx, tasks_token.clone(), server_tracker, ); let processor_task_result = tracker.spawn(processor_task.try_run()); tracker.close(); let result = tokio::select! { biased; // Cancellation token has fired, time to shut down _ = server_token.cancelled() => { debug!( service.instance.id = metadata.instance_id(), "received cancellation", ); // Task may not be complete but was interupted; reply `Err` to nack for task to persist // and retry to continue progress Err(Error::TaskInterrupted(subject_str.to_string())) } // Processor task completed processor_task_result_result = processor_task_result => { match processor_task_result_result { // Processor exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry Ok(Ok(())) => Err(Error::ChangeSetProcessorCompleted), // Processor exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::DeploymentProcessor(err)), // Tokio join error on processor exit; reply `Err` to nack for task to persist and // retry Err(_join_err) => Err(Error::ChangeSetProcessorJoin), } } // The processor tasks has signaled to shutdown from a quiet period _ = quiesced_notify.notified() => { debug!( service.instance.id = metadata.instance_id(), "quiesced notified, starting to shut down", ); // Fire the quiesced_token so that the processing task immediately stops // processing additional requests quiesced_token.cancel(); Ok(()) } }; tasks_token.cancel(); tracker.wait().await; // If the processor task was ended via a quiesced shutdown, then check one last time if there // are messages on the subject. This means that during the quiet period-triggered shutdown, // another message was published to our subject (such as during this handler waiting on the // serial dvu task to finish). In this case, we'll return a new `Err` variant to ensure the // task message is `nack`d and the task will be redelivered. if let Err(Error::ChangeSetProcessorCompleted) = result { match requests_stream .get_last_raw_message_by_subject(requests_stream_filter_subject.as_str()) .await { // We found a message on the subject Ok(message) => { debug!( messaging.message.id = message.sequence, messaging.destination.name = message.subject.as_str(), service.instance.id = metadata.instance_id(), "message found after graceful shutdown", ); Err(Error::TaskHasMessages( requests_stream_filter_subject.to_string(), )) } // Either there was not a message or another error with this call. Either way, we can // return the current `result` value Err(_) => result, } } else { // In all other cases, return our computed `result` value result } } async fn run_change_set_processor_task( state: AppState, subject_str: &str, workspace: ParsedWorkspaceId<'_>, change_set: ParsedChangeSetId<'_>, ) -> Result<()> { let AppState { metadata, nats, frigg, edda_updates, parallel_build_limit, requests_stream, ctx_builder, quiescent_period, token: server_token, server_tracker, } = state; let subject_prefix = nats.metadata().subject_prefix(); let requests_stream_filter_subject = nats::subject::request_for_change_set(subject_prefix, workspace.str, change_set.str); let tracker = TaskTracker::new(); // We want to independently control the lifecyle of our tasks let tasks_token = CancellationToken::new(); let quiesced_token = CancellationToken::new(); let quiesced_notify = Arc::new(Notify::new()); let (last_compressing_heartbeat_tx, last_compressing_heartbeat_rx) = watch::channel(Instant::now()); let incoming = requests_stream .create_consumer(edda_requests_per_change_set_consumer_config( &nats, &requests_stream_filter_subject, )) .await .map_err(Error::ConsumerCreate)? .messages() .await .map_err(Error::Subscribe)?; let incoming = CompressingStream::new( incoming, requests_stream.clone(), last_compressing_heartbeat_tx, ); let processor_task = ChangeSetProcessorTask::create( metadata.clone(), nats, incoming, frigg, edda_updates, parallel_build_limit, workspace.id, change_set.id, ctx_builder, quiescent_period, quiesced_notify.clone(), quiesced_token.clone(), last_compressing_heartbeat_rx, tasks_token.clone(), server_tracker, ); let processor_task_result = tracker.spawn(processor_task.try_run()); tracker.close(); let result = tokio::select! { biased; // Cancellation token has fired, time to shut down _ = server_token.cancelled() => { debug!( service.instance.id = metadata.instance_id(), si.workspace.id = %workspace.str, si.change_set.id = %change_set.str, "received cancellation", ); // Task may not be complete but was interupted; reply `Err` to nack for task to persist // and retry to continue progress Err(Error::TaskInterrupted(subject_str.to_string())) } // Processor task completed processor_task_result_result = processor_task_result => { match processor_task_result_result { // Processor exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry Ok(Ok(())) => Err(Error::ChangeSetProcessorCompleted), // Processor exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::ChangeSetProcessor(err)), // Tokio join error on processor exit; reply `Err` to nack for task to persist and // retry Err(_join_err) => Err(Error::ChangeSetProcessorJoin), } } // The processor tasks has signaled to shutdown from a quiet period _ = quiesced_notify.notified() => { debug!( service.instance.id = metadata.instance_id(), si.workspace.id = %workspace.str, si.change_set.id = %change_set.str, "quiesced notified, starting to shut down", ); // Fire the quiesced_token so that the processing task immediately stops // processing additional requests quiesced_token.cancel(); Ok(()) } }; tasks_token.cancel(); tracker.wait().await; // If the processor task was ended via a quiesced shutdown, then check one last time if there // are messages on the subject. This means that during the quiet period-triggered shutdown, // another message was published to our subject (such as during this handler waiting on the // serial dvu task to finish). In this case, we'll return a new `Err` variant to ensure the // task message is `nack`d and the task will be redelivered. if let Err(Error::ChangeSetProcessorCompleted) = result { match requests_stream .get_last_raw_message_by_subject(requests_stream_filter_subject.as_str()) .await { // We found a message on the subject Ok(message) => { debug!( messaging.message.id = message.sequence, messaging.destination.name = message.subject.as_str(), service.instance.id = metadata.instance_id(), si.change_set.id = %change_set.str, si.workspace.id = %workspace.str, "message found after graceful shutdown", ); Err(Error::TaskHasMessages( requests_stream_filter_subject.to_string(), )) } // Either there was not a message or another error with this call. Either way, we can // return the current `result` value Err(_) => result, } } else { // In all other cases, return our computed `result` value result } } #[remain::sorted] enum ParsedSubject<'a> { ChangeSet(ParsedWorkspaceId<'a>, ParsedChangeSetId<'a>), Deployment, Workspace(ParsedWorkspaceId<'a>), } struct ParsedWorkspaceId<'a> { id: WorkspacePk, str: &'a str, } struct ParsedChangeSetId<'a> { id: ChangeSetId, str: &'a str, } #[inline] fn parse_subject<'a>( subject_prefix: Option<&str>, subject_str: &'a str, ) -> Result<ParsedSubject<'a>> { let mut parts = subject_str.split('.'); if let Some(prefix) = subject_prefix { match parts.next() { // Prefix part matches expected/configured prefix Some(parsed_prefix) if parsed_prefix == prefix => {} // Prefix part does not match expected/configured prefix Some(unexpected) => { return Err(Error::SubjectParse( subject_str.to_string(), format!( "found unexpected subject prefix; expected={prefix}, parsed={unexpected}" ), )); } // Prefix part not found but expected None => { return Err(Error::SubjectParse( subject_str.to_string(), format!("expected subject prefix not found; expected={prefix}"), )); } }; } match ( parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), parts.next(), // assert last part is `None` to ensure there are no additional parts ) { // A deployment-wide request (Some(_), Some(_), Some("deployment"), Some("process"), None, None, None) => { Ok(ParsedSubject::Deployment) } // A workspace request ( Some(_), Some(_), Some("workspace"), Some(workspace_id_str), Some("process"), None, None, ) => { let workspace_id = WorkspacePk::from_str(workspace_id_str).map_err(|err| { Error::SubjectParse( subject_str.to_string(), format!("workspace id parse error: {err}"), ) })?; Ok(ParsedSubject::Workspace(ParsedWorkspaceId { id: workspace_id, str: workspace_id_str, })) } // A change set request ( Some(_), Some(_), Some("change_set"), Some(workspace_id_str), Some(change_set_id_str), Some("process"), None, ) => { let workspace_id = WorkspacePk::from_str(workspace_id_str).map_err(|err| { Error::SubjectParse( subject_str.to_string(), format!("workspace id parse error: {err}"), ) })?; let change_set_id = ChangeSetId::from_str(change_set_id_str).map_err(|err| { Error::SubjectParse( subject_str.to_string(), format!("change set id parse error: {err}"), ) })?; Ok(ParsedSubject::ChangeSet( ParsedWorkspaceId { id: workspace_id, str: workspace_id_str, }, ParsedChangeSetId { id: change_set_id, str: change_set_id_str, }, )) } _ => Err(Error::SubjectParse( subject_str.to_string(), "subject failed to parse with unexpected parts".to_string(), )), } } fn edda_requests_per_change_set_consumer_config( nats: &NatsClient, filter_subject: &Subject, ) -> push::OrderedConfig { push::OrderedConfig { deliver_subject: nats.new_inbox(), filter_subject: filter_subject.to_string(), ..Default::default() } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server