Skip to main content
Glama
lib.rs5.35 kB
//! This crate provides an opinionated [`naxum`] server that "shuttles" (consumes on a source stream and publishes to a //! destination subject) NATS JetStream stream messages to another subject until a "final message" (a message with //! [`FINAL_MESSAGE_HEADER_KEY`] in its headers) is seen. #![warn( bad_style, clippy::missing_panics_doc, clippy::panic, clippy::panic_in_result_fn, clippy::unwrap_in_result, clippy::unwrap_used, dead_code, improper_ctypes, missing_debug_implementations, missing_docs, no_mangle_generic_items, non_shorthand_field_patterns, overflowing_literals, path_statements, patterns_in_fns_without_body, unconditional_recursion, unreachable_pub, unused, unused_allocation, unused_comparisons, unused_parens, while_true )] use std::{ future::IntoFuture, io, }; use futures::Future; use middleware::DeleteMessageOnSuccess; use naxum::{ ServiceBuilder, ServiceExt, TowerServiceExt, handler::Handler, middleware::{ jetstream_post_process::JetstreamPostProcessLayer, trace::TraceLayer, }, response::{ IntoResponse, Response, }, }; use si_data_nats::{ NatsClient, Subject, async_nats::{ self, jetstream::{ consumer::StreamErrorKind, context::RequestErrorKind, stream::ConsumerErrorKind, }, }, jetstream, }; use telemetry::{ prelude::*, tracing::error, }; use thiserror::Error; use tokio_util::sync::CancellationToken; mod app_state; mod handlers; mod middleware; pub use shuttle_core::FINAL_MESSAGE_HEADER_KEY; #[allow(missing_docs)] #[remain::sorted] #[derive(Debug, Error)] pub enum ShuttleError { #[error("async nats consumer error: {0}")] AsyncNatsConsumer(#[from] async_nats::error::Error<ConsumerErrorKind>), #[error("async nats request error: {0}")] AsyncNatsRequest(#[from] async_nats::error::Error<RequestErrorKind>), #[error("async nats stream error: {0}")] AsyncNatsStream(#[from] async_nats::error::Error<StreamErrorKind>), #[error("naxum error: {0}")] Naxum(#[source] io::Error), } type Result<T> = std::result::Result<T, ShuttleError>; /// A running, opinionated [`naxum`] server that "shuttles" messages from a limits-based stream to /// another given subject. pub struct Shuttle { source_subject: Subject, destination_subject: Subject, inner: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>, } impl std::fmt::Debug for Shuttle { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Shuttle") .field("source_subject", &self.source_subject) .field("destination_subject", &self.destination_subject) .finish_non_exhaustive() } } impl Shuttle { /// Creates a new running [`Shuttle`] instance. #[instrument( name = "shuttle.new", level = "trace", skip_all, fields(source_subject, destination_subject) )] pub async fn new( nats: NatsClient, limits_based_source_stream: async_nats::jetstream::stream::Stream, source_subject: Subject, destination_subject: Subject, ) -> Result<Self> { let self_shutdown_token = CancellationToken::new(); let deliver_subject = nats.new_inbox(); let connection_metadata = nats.metadata_clone(); let context = jetstream::new(nats); let incoming = { limits_based_source_stream .create_consumer(async_nats::jetstream::consumer::push::OrderedConfig { deliver_subject, filter_subject: source_subject.to_string(), ..Default::default() }) .await? .messages() .await? }; let state = crate::app_state::AppState::new( context.clone(), destination_subject.clone(), self_shutdown_token.clone(), ); let app = ServiceBuilder::new() .layer( TraceLayer::new() .make_span_with( telemetry_nats::NatsMakeSpan::builder(connection_metadata).build(), ) .on_response(telemetry_nats::NatsOnResponse::new()), ) .layer( JetstreamPostProcessLayer::new() .on_success(DeleteMessageOnSuccess::new(limits_based_source_stream)), ) .service(crate::handlers::default.with_state(state)) .map_response(Response::into_response); let inner = naxum::serve(incoming, app.into_make_service()) .with_graceful_shutdown(naxum::wait_on_cancelled(self_shutdown_token)); Ok(Self { source_subject, destination_subject, inner: Box::new(inner.into_future()), }) } /// Fallibly awaits the inner naxum task. #[instrument(name = "shuttle.try_run", level = "trace", skip_all)] pub async fn try_run(self) -> Result<()> { self.inner.await.map_err(ShuttleError::Naxum)?; trace!(%self.source_subject, %self.destination_subject, "shuttle main loop shutdown complete"); Ok(()) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server