Skip to main content
Glama
lib.rs5.62 kB
//! This crate contains [`Multiplexer`], which provides the ability to only use one [NATS](https://nats.io) //! subscription (usually with wildcard(s) in the subject) to manage receiving on multiple channels. #![warn( bad_style, clippy::missing_panics_doc, clippy::panic, clippy::panic_in_result_fn, clippy::unwrap_in_result, clippy::unwrap_used, dead_code, improper_ctypes, missing_debug_implementations, missing_docs, no_mangle_generic_items, non_shorthand_field_patterns, overflowing_literals, path_statements, patterns_in_fns_without_body, rust_2018_idioms, unconditional_recursion, unreachable_pub, unused, unused_allocation, unused_comparisons, unused_parens, while_true )] use std::{ collections::HashMap, fmt::Debug, }; use futures::StreamExt; use nats_multiplexer_client::MultiplexerClient; use nats_multiplexer_core::{ MultiplexerKey, MultiplexerRequest, }; use si_data_nats::{ Message, NatsClient, Subject, Subscriber, subject::ToSubject, }; use telemetry::prelude::*; use thiserror::Error; use tokio::sync::{ broadcast, mpsc, }; use tokio_util::sync::CancellationToken; /// The buffer used for senders within the [`Multiplexer's`] channels map. const MULTIPLEXER_BROADCAST_SENDER_BUFFER: usize = 4096; // NOTE(nick): this module is intentionally private. mod parsing; #[allow(missing_docs)] #[remain::sorted] #[derive(Debug, Error)] pub enum MultiplexerError { #[error("nats error: {0}")] Nats(#[from] si_data_nats::Error), } #[allow(missing_docs)] pub type MultiplexerResult<T> = Result<T, MultiplexerError>; /// A [NATS](https://nats.io) multiplexer, will contains a subscription to one subject and contains a map of channels /// for those wishing to receive from the same or more-specific subjects. #[derive(Debug)] pub struct Multiplexer { subject: Subject, subscriber: Subscriber, channels: HashMap<MultiplexerKey, broadcast::Sender<Message>>, client_rx: mpsc::UnboundedReceiver<MultiplexerRequest>, token: CancellationToken, } impl Multiplexer { const NAME: &'static str = "nats_multiplexter::multiplexer"; /// Creates a new [`Multiplexer`]. pub async fn new( nats: &NatsClient, subject: impl ToSubject, token: CancellationToken, ) -> MultiplexerResult<(Self, MultiplexerClient)> { let subject = subject.to_subject(); let subscriber = nats.subscribe(subject.clone()).await?; let (client_tx, client_rx) = mpsc::unbounded_channel(); Ok(( Self { subscriber, channels: Default::default(), client_rx, subject, token, }, MultiplexerClient::new(client_tx), )) } /// Runs the [`Multiplexer`] with a given shutdown receiver. pub async fn run(mut self) { debug!(%self.subject, "running channel multiplexer"); loop { tokio::select! { Some(message) = self.subscriber.next() => { if let Err(e) = self.process_message(message) { error!("{e}"); } } Some(request) = self.client_rx.recv() => { if let Err(e) = self.process_client_request(request) { error!("{e}"); } } _ = self.token.cancelled() => { info!( task = Self::NAME, subject = %self.subject, "received cancellation", ); // NOTE(nick,fletcher): we may not want to unsubscribe here. if let Err(e) = self.subscriber.unsubscribe().await { error!("{e}"); } break; }, } } debug!(task = Self::NAME, subject = %self.subject, "shutdown complete"); } fn process_message(&self, message: Message) -> MultiplexerResult<()> { let subject = message.subject().to_string(); // We need to fan out not only to those receiving for the literal subject, but also for those using wildcards. // That is just wild! for key in parsing::keys_for_potential_receivers(subject.clone()) { if let Some(sender) = self.channels.get(&key) { trace!(%subject, %key, "sending message for receiver corresponding to key"); if sender.send(message.clone()).is_err() { trace!(%subject, %key, "unable to send message (likely there are no receivers left)"); } } } Ok(()) } fn process_client_request(&mut self, request: MultiplexerRequest) -> MultiplexerResult<()> { debug!(?request, "found request"); match request { MultiplexerRequest::Add((subject, reply_tx)) => { // NOTE(nick): major props to fnichol for this idea. let sender = self.channels.entry(subject).or_insert_with(|| { let (sender, _) = broadcast::channel(MULTIPLEXER_BROADCAST_SENDER_BUFFER); sender }); // NOTE(nick): this returns what it couldn't send when erroring. if reply_tx.send(sender.subscribe()).is_err() { error!("could not process client request"); } } } Ok(()) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server