Skip to main content
Glama
main.rs9.27 kB
use std::{ convert::Infallible, error, sync::Arc, }; use async_nats::Subject; use futures::{ StreamExt, stream, }; use naxum::{ Head, Json, extract::State, handler::Handler, }; use serde::{ Deserialize, Serialize, }; use tokio::{ signal::unix::{ self, SignalKind, }, sync::Notify, }; use tokio_util::{ sync::CancellationToken, task::TaskTracker, }; use tower::ServiceBuilder; use tracing::info; use tracing_subscriber::{ EnvFilter, Registry, fmt::{ self, format::FmtSpan, }, layer::SubscriberExt as _, util::SubscriberInitExt as _, }; use self::message::LocalMessage; const TRACING_LOG_ENV_VAR: &str = "SI_LOG"; const DEFAULT_TRACING_DIRECTIVES: &str = "nats_less_messages=trace,naxum=trace,info"; #[derive(Clone, Debug, Serialize, Deserialize)] struct MyRequest { id: usize, message: String, is_cool: bool, } #[derive(Clone, Debug)] struct AppState {} async fn default( State(_state): State<AppState>, // [`LoaclMessage`] impls [`MessageHead`] and so [`Head`] info can be extracted head: Head, // Message payload has been JSON-encoded so the default [`Json`] extractor works as-is Json(request): Json<MyRequest>, ) -> naxum::response::Result<()> { info!( subject = head.subject.as_str(), reply = head.reply.as_deref(), headers_size = head.headers.map(|headers| headers.len()), status = head.status.map(|status| status.as_u16()), description = head.description.as_ref(), length = head.length, extensions_size = head.extensions.len(), request = ?request, "processing message", ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Box<dyn error::Error>> { Registry::default() .with( EnvFilter::try_from_env(TRACING_LOG_ENV_VAR) .unwrap_or_else(|_| EnvFilter::new(DEFAULT_TRACING_DIRECTIVES)), ) .with( fmt::layer() .with_thread_ids(true) .with_thread_names(true) .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) .pretty(), ) .try_init()?; // Generate local `impl Stream` of a `Vec` of `LocalMessage` which has an `impl MessageHead`, // thus satisfying Naxum's incoming stream requirements let incoming = stream::iter(messages()?) // The iterator stream is a stream of `Option<LocalMessage>` so we convert this into a // stream of `Option<Result<LocalMessage, Infallible>>` .map(Ok::<_, Infallible>); // Setup a Tower `Service` stack with some middleware let app = ServiceBuilder::new().service(default.with_state(AppState {})); // Use a Tokio `TaskTracker` and `CancellationToken` to support signal handling and graceful // shutdown let tracker = TaskTracker::new(); let token = CancellationToken::new(); let naxum_terminated = Arc::new(Notify::new()); let naxum_exited = naxum_terminated.clone(); let naxum_token = token.clone(); tracker.spawn(async move { info!("ready to receive messages from a locally produced channel",); let result = naxum::serve(incoming, app.into_make_service()) .with_graceful_shutdown(naxum::wait_on_cancelled(naxum_token)) .await; naxum_terminated.notify_one(); result }); // Create streams of `SIGINT` (i.e. `Ctrl+c`) and `SIGTERM` signals let mut sig_int = unix::signal(SignalKind::interrupt())?; let mut sig_term = unix::signal(SignalKind::terminate())?; // Wait until one of the signal streams gets a signal, after which we will close the task // tracker and cancel the token, signaling all holders of the token tokio::select! { _ = sig_int.recv() => { info!("received SIGINT, performing graceful shutdown"); tracker.close(); token.cancel(); } _ = sig_term.recv() => { info!("received SIGTERM, performing graceful shutdown"); tracker.close(); token.cancel(); } _ = naxum_exited.notified() => { info!("naxum app exited, performing graceful shutdown"); tracker.close(); token.cancel(); } } // Wait for all tasks to finish tracker.wait().await; info!("graceful shutdown complete"); Ok(()) } fn messages() -> Result<Vec<LocalMessage>, Box<dyn error::Error>> { // Produce a collection of request messages let messages = vec![ MyRequest { id: 1, message: "Rush".to_owned(), is_cool: true, }, MyRequest { id: 2, message: "Fly by Night".to_owned(), is_cool: false, }, MyRequest { id: 3, message: "Caress of Steel".to_owned(), is_cool: true, }, MyRequest { id: 4, message: "2112".to_owned(), is_cool: false, }, MyRequest { id: 5, message: "A Farewell to Kings".to_owned(), is_cool: true, }, ] .into_iter() // Encode each request into a [`LocalMessage`] which implements the [`MessageHead`] trait .map(|request| LocalMessage { subject: Subject::from_utf8(format!("my.requests.{}", request.id)) .expect("failed to parse subject"), headers: None, payload: serde_json::to_vec(&request) .expect("failed to serialize") .into(), }) .collect(); Ok(messages) } mod message { use async_nats::{ HeaderMap, Subject, }; use bytes::Bytes; use naxum::{ Extensions, Head, MessageHead, }; // A local alternative to the core/Jetstream NATS message types #[derive(Clone, Debug)] pub struct LocalMessage { pub subject: Subject, pub headers: Option<HeaderMap>, pub payload: Bytes, } // Implementing the [`MessageHead`] trait makes [`LocalMessage`] a valid message type for a // Naxum app impl MessageHead for LocalMessage { // Each message needs a subject, but this can be anything, assuming it operates within the // naming rules of a [`Subject`] fn subject(&self) -> &Subject { &self.subject } // This type won't have reply subjects as we don't use NATS with this type fn reply(&self) -> Option<&Subject> { None } // Headers might still be useful to propagate OpenTelemetry spans, allow for more complex // message encodings, etc. fn headers(&self) -> Option<&async_nats::HeaderMap> { self.headers.as_ref() } // This type won't use a status as we don't use NATS with this type fn status(&self) -> Option<naxum::StatusCode> { None } // No need for a description either, so this will always be `None` fn description(&self) -> Option<&str> { None } // It's not worth adding the extra size calculations for the message (i.e. it's not going // to be encoded as bytes), so return the payload size as a rough approximation fn length(&self) -> usize { self.payload_length() } // Payload is still encoded in [`Bytes`] so use its length as normal fn payload_length(&self) -> usize { self.payload.len() } fn from_head_and_payload( head: naxum::Head, payload: bytes::Bytes, ) -> Result<(Self, naxum::Extensions), naxum::FromPartsError> where Self: Sized, { let Head { subject, // Replies aren't tracked with this type reply: _, headers, // Status isn't tracked with this type status: _, // Descriptions aren't used with this type description: _, // Length isn't tracked with this type length: _, extensions, } = head; Ok(( Self { subject, headers, payload, }, extensions, )) } fn into_head_and_payload(self) -> (naxum::Head, bytes::Bytes) { let Self { subject, headers, payload, } = self; ( Head { subject, // Replies aren't tracked with this type reply: None, headers, // Status isn't tracked with this type status: None, // Descriptions aren't used with this type description: None, // Length isn't tracked with this type length: 0, extensions: Extensions::new(), }, payload, ) } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server