Skip to main content
Glama
lib.rs62.9 kB
#![warn( clippy::unwrap_in_result, clippy::unwrap_used, clippy::panic, clippy::missing_panics_doc, clippy::panic_in_result_fn )] #![allow(clippy::missing_errors_doc)] use std::{ cmp, fmt::Debug, hash, io, ops, sync::Arc, time::Duration, }; use async_nats::{ ToServerAddrs, subject::ToSubject, }; use serde::{ Deserialize, Serialize, }; use telemetry::prelude::*; use telemetry_utils::metric; use thiserror::Error; use tokio::sync::Mutex; mod connect_options; mod message; mod subscriber; pub mod jetstream; pub mod service; pub use async_nats::{ self, Auth, AuthError, Error as InnerError, Event, HeaderName, HeaderValue, ServerAddr, ServerInfo, Subject, connection::State, header, header::HeaderMap, rustls, status, subject, }; pub use bytes::Bytes; pub use connect_options::ConnectOptions; pub use message::{ InnerMessage, Message, }; pub use subscriber::Subscriber; pub use telemetry::opentelemetry::Context as OpenTelemetryContext; pub type NatsError = Error; #[remain::sorted] #[derive(Debug, Error)] pub enum Error { #[error("io error: {0}")] Io(#[from] io::Error), #[error("nats connect error: {0}")] NatsConnect(#[from] async_nats::ConnectError), #[error("nats drain error: {0}")] NatsDrain(#[from] async_nats::client::DrainError), #[error("nats flush error: {0}")] NatsFlush(#[from] async_nats::client::FlushError), #[error("nats publish error: {0}")] NatsPublish(#[from] async_nats::PublishError), #[error("nats reconnect error: {0}")] NatsReconnect(#[from] async_nats::client::ReconnectError), #[error("nats request error: {0}")] NatsRequest(#[from] async_nats::RequestError), #[error("nats subscribe error: {0}")] NatsSubscribe(#[from] async_nats::SubscribeError), #[error("nats unsubscribe error: {0}")] NatsUnsubscribe(#[from] async_nats::UnsubscribeError), #[error("error serializing object: {0}")] Serialize(#[source] serde_json::Error), } pub type Result<T> = std::result::Result<T, Error>; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct NatsConfig { pub connection_name: Option<String>, #[serde(skip_serializing)] pub creds: Option<String>, #[serde(skip_serializing)] pub creds_file: Option<String>, pub subject_prefix: Option<String>, pub url: String, } impl Default for NatsConfig { fn default() -> Self { Self { connection_name: None, creds: None, creds_file: None, subject_prefix: None, url: "localhost".to_string(), } } } // Ensure that we only grab the current span if we're at debug level or lower, otherwise use none. // // When recording a parent span for long running tasks such as a transaction we want the direct // span parent. However, `Span::current()` returns a suitable parent span, according to the tracing // `Subscriber`, meaning that instead of capturing the transaction starting span, we might capture // a calling function up the stack that is at the info level or higher. In other words, then // "transaction span" might be an ancestor span unless we're really careful. macro_rules! current_span_for_debug { () => { Span::none() }; } pub type NatsClient = Client; #[derive(Clone, Debug)] pub struct Client { inner: async_nats::Client, metadata: Arc<ConnectionMetadata>, } impl Client { // TODO(fnichol): refactor pub fn as_inner(&self) -> &async_nats::Client { &self.inner } pub(crate) fn into_parts(self) -> (async_nats::Client, Arc<ConnectionMetadata>) { (self.inner, self.metadata) } #[instrument(name = "nats_client::new", skip_all, level = "debug")] pub async fn new(config: &NatsConfig) -> Result<Self> { let mut options = ConnectOptions::default(); if let Some(creds) = &config.creds { options = options.credentials(creds)?; } if let Some(creds_file) = &config.creds_file { options = options.credentials_file(creds_file).await?; } if let Some(connection_name) = &config.connection_name { options = options.name(connection_name); } // Add metrics for reconnect delay callback. It uses the default reconnect delay callback // code with new metrics. // Source: https://docs.rs/async-nats/latest/src/async_nats/connector.rs.html#84-92 metric!(counter.nats.reconnect_delay_callback.multiple_attempts = 0); options = options.reconnect_delay_callback(|attempts| { if attempts <= 1 { Duration::from_millis(0) } else { metric!(counter.nats.reconnect_delay_callback.multiple_attempts = 1); let exp: u32 = (attempts - 1).try_into().unwrap_or(u32::MAX); let max = Duration::from_secs(4); cmp::min(Duration::from_millis(2_u64.saturating_pow(exp)), max) } }); // Reset all of the metrics before setting up the event callback. metric!(counter.nats.event_callback.connected = 0); metric!(counter.nats.event_callback.disconnected = 0); metric!(counter.nats.event_callback.closed = 0); metric!(counter.nats.event_callback.lame_duck_mode = 0); metric!(counter.nats.event_callback.draining = 0); metric!(counter.nats.event_callback.slow_consumer = 0); metric!( counter .nats .event_callback .server_error .authorization_violation = 0 ); metric!(counter.nats.event_callback.server_error.slow_consumer = 0); metric!(counter.nats.event_callback.server_error.other = 0); metric!(counter.nats.event_callback.client_error.max_reconnects = 0); metric!(counter.nats.event_callback.client_error.other = 0); options = options.event_callback(|event| async move { match event { Event::Connected => metric!(counter.nats.event_callback.connected = 1), Event::Disconnected => metric!(counter.nats.event_callback.disconnected = 1), Event::Closed => metric!(counter.nats.event_callback.closed = 1), Event::LameDuckMode => metric!(counter.nats.event_callback.lame_duck_mode = 1), Event::Draining => metric!(counter.nats.event_callback.draining = 1), Event::SlowConsumer(sid) => { metric!(counter.nats.event_callback.slow_consumer = 1, sid = sid) } Event::ServerError(async_nats::ServerError::AuthorizationViolation) => { metric!( counter .nats .event_callback .server_error .authorization_violation = 1 ) } Event::ServerError(async_nats::ServerError::SlowConsumer(sid)) => { metric!( counter.nats.event_callback.server_error.slow_consumer = 1, sid = sid ) } Event::ServerError(async_nats::ServerError::Other(err_string)) => { metric!( counter.nats.event_callback.server_error.other = 1, si_error_message = err_string ) } Event::ClientError(async_nats::ClientError::MaxReconnects) => { metric!(counter.nats.event_callback.client_error.max_reconnects = 1) } Event::ClientError(async_nats::ClientError::Other(err_string)) => { metric!( counter.nats.event_callback.client_error.other = 1, si_error_message = err_string ) } } }); Self::connect_with_options(&config.url, config.subject_prefix.clone(), options).await } /// Returns the default timeout for requests set when creating the client. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// println!("default request timeout: {:?}", client.timeout()); /// # Ok(()) /// # } /// ``` pub fn timeout(&self) -> Option<Duration> { self.inner.timeout() } /// Returns last received info from the server. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main () -> Result<(), async_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// println!("info: {:?}", client.server_info()); /// # Ok(()) /// # } /// ``` pub fn server_info(&self) -> async_nats::ServerInfo { self.inner.server_info() } /// Returns true if the server version is compatible with the version components. /// /// This has to be used with caution, as it is not guaranteed that the server that client is /// connected to is the same version that the one that is a JetStream meta/stream/consumer /// leader, especially across leafnodes. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// assert!(client.is_server_compatible(2, 8, 4)); /// # Ok(()) /// # } /// ``` pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool { self.inner.is_server_compatible(major, minor, patch) } /// Publish a [Message] to a given subject. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// client.publish("events.data", "payload".into()).await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.publish", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Producer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn publish<S: ToSubject>(&self, subject: S, payload: Bytes) -> Result<()> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record("messaging.message.body.size", payload.len()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); self.inner .publish(subject, payload) .await .map_err(|err| span.record_err(Error::NatsPublish(err)))?; span.record_ok(); Ok(()) } /// Publish a [Message] with headers to a given subject. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// use std::str::FromStr; /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let mut headers = async_nats::HeaderMap::new(); /// headers.insert( /// "X-Header", /// async_nats::HeaderValue::from_str("Value").unwrap(), /// ); /// client.publish_with_headers("events.data", headers, "payload".into()).await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.publish_with_headers", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Producer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn publish_with_headers<S: ToSubject>( &self, subject: S, headers: HeaderMap, payload: Bytes, ) -> Result<()> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record("messaging.message.body.size", payload.len()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); self.inner .publish_with_headers(subject, headers, payload) .await .map_err(|err| span.record_err(Error::NatsPublish(err)))?; span.record_ok(); Ok(()) } /// Publish a [Message] to a given subject, with specified response subject to which the /// subscriber can respond. This method does not await for the response. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// client.publish_with_reply("events.data", "reply_subject", "payload".into()).await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.publish_with_reply", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Producer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn publish_with_reply<S: ToSubject, R: ToSubject>( &self, subject: S, reply: R, payload: Bytes, ) -> Result<()> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record("messaging.message.body.size", payload.len()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); self.inner .publish_with_reply(subject, reply, payload) .await .map_err(|err| span.record_err(Error::NatsPublish(err)))?; span.record_ok(); Ok(()) } /// Publish a [Message] to a given subject with headers and specified response subject to which /// the subscriber can respond. This method does not await for the response. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use std::str::FromStr; /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let mut headers = async_nats::HeaderMap::new(); /// client /// .publish_with_reply_and_headers( /// "events.data", /// "reply_subject", /// headers, /// "payload".into(), /// ) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.publish_with_reply_and_headers", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Producer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>( &self, subject: S, reply: R, headers: HeaderMap, payload: Bytes, ) -> Result<()> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record("messaging.message.body.size", payload.len()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); self.inner .publish_with_reply_and_headers(subject, reply, headers, payload) .await .map_err(|err| span.record_err(Error::NatsPublish(err)))?; span.record_ok(); Ok(()) } /// Sends the request with headers. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let response = client.request("service", "data".into()).await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.request", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // TODO: maybe use this and inject in headers? // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), // similar to an RPC operation otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn request<S: ToSubject>(&self, subject: S, payload: Bytes) -> Result<Message> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record("messaging.message.body.size", payload.len()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); let msg = self .inner .request(subject, payload) .await .map_err(|err| span.record_err(Error::NatsRequest(err)))?; span.record_ok(); Ok(Message::new(msg, self.metadata.clone())) } /// Sends the request with headers. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let mut headers = async_nats::HeaderMap::new(); /// headers.insert("Key", "Value"); /// let response = client /// .request_with_headers("service", headers, "data".into()) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.request_with_headers", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // TODO: maybe use this and inject in headers? // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), // similar to an RPC operation otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn request_with_headers<S: ToSubject>( &self, subject: S, headers: HeaderMap, payload: Bytes, ) -> Result<Message> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record("messaging.message.body.size", payload.len()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); let msg = self .inner .request_with_headers(subject, headers, payload) .await .map_err(|err| span.record_err(Error::NatsRequest(err)))?; span.record_ok(); Ok(Message::new(msg, self.metadata.clone())) } /// Sends the request created by the [Request]. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let request = si_data_nats::Request::new().payload("data".into()); /// let response = client.send_request("service", request).await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.send_request", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // TODO: maybe use this and inject in headers? // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), // similar to an RPC operation otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn send_request<S: ToSubject>( &self, subject: S, request: Request, ) -> Result<Message> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); if let Some(ref payload) = request.payload { span.record("messaging.message.body.size", payload.len()); } span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); let msg = self .inner .send_request(subject, request.into()) .await .map_err(|err| span.record_err(Error::NatsRequest(err)))?; span.record_ok(); Ok(Message::new(msg, self.metadata.clone())) } /// Create a new globally unique inbox which can be used for replies. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let nc = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let reply = nc.new_inbox(); /// let rsub = nc.subscribe(reply).await?; /// # Ok(()) /// # } /// ``` #[must_use] pub fn new_inbox(&self) -> String { self.inner.new_inbox() } /// Subscribes to a subject to receive [messages][Message]. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use futures::StreamExt; /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let mut subscription = client.subscribe("events.>").await?; /// while let Some(message) = subscription.next().await { /// println!("received message: {:?}", message); /// } /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.subscribe", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Receive.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Consumer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Receive.as_str()).as_str(), ); let sub_subject = subject.clone(); let sub = self .inner .subscribe(sub_subject) .await .map_err(|err| span.record_err(Error::NatsSubscribe(err)))?; Ok(Subscriber::new( sub, &subject, self.metadata.clone(), current_span_for_debug!(), )) } /// Subscribes to a subject with a queue group to receive [messages][Message]. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// use futures::StreamExt; /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?; /// while let Some(message) = subscription.next().await { /// println!("received message: {:?}", message); /// } /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.queue_subscribe", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.nats.queue_group = Empty, messaging.operation = MessagingOperation::Receive.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Consumer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn queue_subscribe<S: ToSubject>( &self, subject: S, queue_group: String, ) -> Result<Subscriber> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Receive.as_str()).as_str(), ); span.record("messaging.nats.queue_group", queue_group.as_str()); let sub_subject = subject.clone(); let sub = self .inner .queue_subscribe(sub_subject, queue_group) .await .map_err(|err| span.record_err(Error::NatsSubscribe(err)))?; Ok(Subscriber::new( sub, &subject, self.metadata.clone(), current_span_for_debug!(), )) } /// Flushes the internal buffer ensuring that all messages are sent. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// client.flush().await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.flush", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), // similar to an RPC operation otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn flush(&self) -> Result<()> { let span = current_span_for_instrument_at!("debug"); self.inner .flush() .await .map_err(|err| span.record_err(Error::NatsFlush(err)))?; span.record_ok(); Ok(()) } /// Drains all subscriptions, stops any new messages from being published, and flushes any /// remaining messages, then closes the connection. /// /// Once completed, any associated streams associated with the client will be closed, and /// further client commands will fail /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use futures::StreamExt; /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let mut subscription = client.subscribe("events.>").await?; /// /// client.drain().await?; /// /// # // existing subscriptions are closed and further commands will fail /// assert!(subscription.next().await.is_none()); /// client /// .subscribe("events.>") /// .await /// .expect_err("Expected further commands to fail"); /// /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.drain", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), // similar to an RPC operation otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn drain(&self) -> Result<()> { let span = current_span_for_instrument_at!("debug"); self.inner .drain() .await .map_err(|err| span.record_err(Error::NatsDrain(err)))?; span.record_ok(); Ok(()) } /// Returns the current state of the connection. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// println!("connection state: {}", client.connection_state()); /// # Ok(()) /// # } /// ``` pub fn connection_state(&self) -> State { self.inner.connection_state() } /// Forces the client to reconnect. /// Keep in mind that client will reconnect automatically if the connection is lost and this /// method does not have to be used in normal circumstances. /// However, if you want to force the client to reconnect, for example to re-trigger /// the `auth-callback`, or manually rebalance connections, this method can be useful. /// This method does not wait for connection to be re-established. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = async_nats::connect("demo.nats.io").await?; /// client.force_reconnect().await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client.force_reconnect", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), // similar to an RPC operation otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn force_reconnect(&self) -> Result<()> { let span = current_span_for_instrument_at!("debug"); self.inner .force_reconnect() .await .map_err(|err| span.record_err(Error::NatsReconnect(err)))?; span.record_ok(); Ok(()) } /// Returns struct representing statistics of the whole lifecycle of the client. /// /// This includes number of bytes sent/received, number of messages sent/received, and number /// of times the connection was established. As this returns [Arc] with [AtomicU64] fields, it /// can be safely reused and shared across threads. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// let statistics = client.statistics(); /// println!("client statistics: {:#?}", statistics); /// # Ok(()) /// # } /// ``` pub fn statistics(&self) -> Arc<async_nats::Statistics> { self.inner.statistics() } } // API extensions impl Client { #[instrument( name = "nats_client.transaction", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), messaging.x.transaction = Empty, network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub fn transaction(&self) -> NatsTxn { NatsTxn::new( self.clone(), self.metadata.clone(), current_span_for_debug!(), ) } /// Establish a `Connection` with a NATS server. /// /// Multiple servers may be specified by separating them with commas. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let nc = Client::connect_with_options( /// "demo.nats.io", /// None, /// ConnectOptions::default(), /// ).await?; /// # Ok(()) /// # } /// ``` /// /// In the below case, the second server is configured to use TLS but the first one is not. /// Using the `tls_required` method can ensure that all servers are connected to with TLS, if /// that is your intention. /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let nc = Client::connect_with_options( /// "nats://demo.nats.io:4222,tls://demo.nats.io:4443", /// None, /// ConnectOptions::default(), /// ) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "nats_client::connect_with_options", skip_all, level = "debug", fields( messaging.client_id = Empty, messaging.nats.server.id = Empty, messaging.nats.server.name = Empty, messaging.nats.server.version = Empty, messaging.system = Empty, messaging.url = Empty, network.peer.address = Empty, network.protocol.name = Empty, network.protocol.version = Empty, network.transport = Empty, otel.kind = SpanKind::Client.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = Empty, server.port = Empty, ) )] pub async fn connect_with_options( addrs: impl ToServerAddrs, subject_prefix: Option<String>, options: ConnectOptions, ) -> Result<Self> { let addrs = addrs.to_server_addrs()?.collect::<Vec<_>>(); let messaging_system = "nats"; let messaging_url = addrs .clone() .into_iter() .map(|a| a.into_inner().into()) .collect::<Vec<String>>() .join(","); let network_protocol_name = "nats"; let network_transport = "ip_tcp"; let span = current_span_for_instrument_at!("debug"); span.record("messaging.system", messaging_system); span.record("messaging.url", messaging_url.as_str()); span.record("network.protocol.name", network_protocol_name); span.record("network.transport", network_transport); let inner = options .inner .connect(addrs) .await .map_err(|err| span.record_err(Error::NatsConnect(err)))?; debug!("successfully connected to nats"); let server_info = inner.server_info(); let metadata = ConnectionMetadata { messaging_client_id: server_info.client_id.to_string(), messaging_system, messaging_url, messaging_nats_server_id: server_info.server_id, messaging_nats_server_name: server_info.server_name, messaging_nats_server_version: server_info.version, network_peer_address: server_info.client_ip, network_transport, network_protocol_name, network_protocol_version: server_info.proto.to_string(), server_address: server_info.host, server_port: server_info.port, subject_prefix, }; span.record("messaging.client_id", metadata.messaging_client_id.as_str()); span.record( "messaging.nats.server.id", metadata.messaging_nats_server_id.as_str(), ); span.record( "messaging.nats.server.name", metadata.messaging_nats_server_name.as_str(), ); span.record( "messaging.nats.server.version", metadata.messaging_nats_server_version.as_str(), ); span.record( "network.peer.address", metadata.network_peer_address.as_str(), ); span.record( "network.protocol.version", metadata.network_protocol_version.as_str(), ); span.record("server.address", metadata.server_address.as_str()); span.record("server.port", metadata.server_port); span.record_ok(); Ok(Self { inner, metadata: Arc::new(metadata), }) } /// Gets a reference to the client's metadata. pub fn metadata(&self) -> &ConnectionMetadata { self.metadata.as_ref() } /// Gets a cloned copy of the client's metadata. pub fn metadata_clone(&self) -> Arc<ConnectionMetadata> { self.metadata.clone() } } #[derive(Clone, Debug)] pub struct ConnectionMetadata { messaging_client_id: String, messaging_nats_server_id: String, messaging_nats_server_name: String, messaging_nats_server_version: String, messaging_system: &'static str, messaging_url: String, network_peer_address: String, network_protocol_name: &'static str, network_protocol_version: String, network_transport: &'static str, server_address: String, server_port: u16, subject_prefix: Option<String>, } impl ConnectionMetadata { /// Gets a reference to the connection metadata's messaging client id. pub fn messaging_client_id(&self) -> &str { self.messaging_client_id.as_ref() } /// Gets a reference to the connection metadata's messaging nats server id. pub fn messaging_nats_server_id(&self) -> &str { self.messaging_nats_server_id.as_ref() } /// Gets a reference to the connection metadata's messaging nats server name. pub fn messaging_nats_server_name(&self) -> &str { self.messaging_nats_server_name.as_ref() } /// Gets a reference to the connection metadata's messaging nats server version. pub fn messaging_nats_server_version(&self) -> &str { self.messaging_nats_server_version.as_ref() } /// Gets a reference to the connection metadata's messaging system. pub fn messaging_system(&self) -> &str { self.messaging_system } /// Gets a reference to the connection metadata's messaging url. pub fn messaging_url(&self) -> &str { self.messaging_url.as_ref() } /// Gets a reference to the connection metadata's network peer address. pub fn network_peer_address(&self) -> &str { self.network_peer_address.as_ref() } /// Gets a reference to the connection metadata's network protocol name. pub fn network_protocol_name(&self) -> &str { self.network_protocol_name } /// Gets a reference to the connection metadata's network protocol version. pub fn network_protocol_version(&self) -> &str { self.network_protocol_version.as_ref() } /// Gets a reference to the connection metadata's network transport. pub fn network_transport(&self) -> &str { self.network_transport } /// Gets a reference to the connection metadata's server address. pub fn server_address(&self) -> &str { self.server_address.as_ref() } /// Gets a reference to the connection metadata's server port. pub fn server_port(&self) -> u16 { self.server_port } /// Gets the common prefix for use on all subjects. pub fn subject_prefix(&self) -> Option<&str> { self.subject_prefix.as_deref() } } #[derive(Clone, Debug)] pub struct NatsTxn { client: Client, pending_publish: Arc<Mutex<Vec<(Subject, serde_json::Value)>>>, metadata: Arc<ConnectionMetadata>, tx_span: Span, } impl NatsTxn { fn new(client: Client, metadata: Arc<ConnectionMetadata>, tx_span: Span) -> Self { Self { client, pending_publish: Arc::new(Mutex::new(Vec::new())), metadata, tx_span, } } #[instrument( name = "nats_txn.publish", skip_all, level = "debug", fields( messaging.destination.name = Empty, otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, ) )] pub async fn publish<T>(&self, subject: impl ToSubject, object: &T) -> Result<()> where T: Serialize + Debug, { let span = current_span_for_instrument_at!("debug"); span.follows_from(&self.tx_span); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); let json: serde_json::Value = serde_json::to_value(object) .map_err(|err| span.record_err(self.tx_span.record_err(Error::Serialize(err))))?; let mut pending_publish = self.pending_publish.lock().await; pending_publish.push((subject, json)); Ok(()) } #[instrument( name = "nats_txn.publish_immediately", skip_all, level = "debug", fields( messaging.destination.name = Empty, otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, ) )] pub async fn publish_immediately<T>(&self, subject: impl ToSubject, object: &T) -> Result<()> where T: Serialize + Debug, { let span = current_span_for_instrument_at!("debug"); span.follows_from(&self.tx_span); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); let json: serde_json::Value = serde_json::to_value(object) .map_err(|err| span.record_err(self.tx_span.record_err(Error::Serialize(err))))?; let msg = serde_json::to_vec(&json) .map_err(|err| span.record_err(self.tx_span.record_err(Error::Serialize(err))))?; self.client .publish(subject, msg.into()) .await .map_err(|err| span.record_err(self.tx_span.record_err(err)))?; Ok(()) } #[instrument( name = "nats_txn.commit_into_conn", skip_all, level = "debug", fields( otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, ) )] pub async fn commit_into_conn(self) -> Result<Client> { let span = current_span_for_instrument_at!("debug"); span.follows_from(&self.tx_span); let mut pending_publish = self.pending_publish.lock_owned().await; for (subject, object) in pending_publish.drain(0..) { let msg = serde_json::to_vec(&object) .map_err(|err| span.record_err(self.tx_span.record_err(Error::Serialize(err))))?; self.client .publish(subject, msg.into()) .await .map_err(|err| span.record_err(self.tx_span.record_err(err)))?; } self.tx_span.record_ok(); self.tx_span.record("messaging.x.transaction", "commit"); span.record_ok(); Ok(self.client) } #[instrument( name = "nats_txn.commit", skip_all, level = "debug", fields( otel.kind = SpanKind::Internal.as_str(), ) )] pub async fn commit(self) -> Result<()> { let _ = self.commit_into_conn().await?; Ok(()) } #[instrument( name = "nats_txn.rollback_into_conn", skip_all, level = "debug", fields( otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, ) )] pub async fn rollback_into_conn(self) -> Result<Client> { let span = current_span_for_instrument_at!("debug"); span.follows_from(&self.tx_span); // Nothing much to do, we want to drop the pending publishes which happens when this // function returns (i.e. it consumes `self`). self.tx_span.record_ok(); self.tx_span.record("messaging.x.transaction", "rollback"); span.record_ok(); Ok(self.client) } #[instrument( name = "nats_txn.rollback", skip_all, level = "debug", fields( otel.kind = SpanKind::Internal.as_str(), ) )] pub async fn rollback(self) -> Result<()> { let _ = self.rollback_into_conn().await?; Ok(()) } /// Gets a reference to the nats txn's metadata. pub fn metadata(&self) -> &ConnectionMetadata { self.metadata.as_ref() } } /// Used for building customized requests. /// /// Note: this type is wrapped in order to expose the inner fields this wrapping module. #[derive(Default)] pub struct Request { payload: Option<Bytes>, headers: Option<HeaderMap>, timeout: Option<Option<Duration>>, inbox: Option<String>, } impl Request { pub fn new() -> Request { Default::default() } /// Sets the payload of the request. If not used, empty payload will be sent. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let client = si_data_nats::ConnectOptions::new().connect("demo.nats.io", None).await?; /// let request = si_data_nats::Request::new().payload("data".into()); /// client.send_request("service", request).await?; /// # Ok(()) /// # } /// ``` pub fn payload(mut self, payload: Bytes) -> Request { self.payload = Some(payload); self } /// Sets the headers of the requests. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// use std::str::FromStr; /// let client = si_data_nats::ConnectOptions::new().connect("demo.nats.io", None).await?; /// let mut headers = si_data_nats::HeaderMap::new(); /// headers.insert( /// "X-Example", /// si_data_nats::HeaderValue::from_str("Value").unwrap(), /// ); /// let request = si_data_nats::Request::new() /// .headers(headers) /// .payload("data".into()); /// client.send_request("service", request).await?; /// # Ok(()) /// # } /// ``` pub fn headers(mut self, headers: HeaderMap) -> Request { self.headers = Some(headers); self } /// Sets the custom timeout of the request. Overrides default [Client] timeout. /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock. /// To use default timeout, simply do not call this function. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let client = si_data_nats::ConnectOptions::new().connect("demo.nats.io", None).await?; /// let request = si_data_nats::Request::new() /// .timeout(Some(std::time::Duration::from_secs(15))) /// .payload("data".into()); /// client.send_request("service", request).await?; /// # Ok(()) /// # } /// ``` pub fn timeout(mut self, timeout: Option<Duration>) -> Request { self.timeout = Some(timeout); self } /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox. /// /// # Examples /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// use std::str::FromStr; /// let client = si_data_nats::ConnectOptions::new().connect("demo.nats.io", None).await?; /// let request = si_data_nats::Request::new() /// .inbox("custom_inbox".into()) /// .payload("data".into()); /// client.send_request("service", request).await?; /// # Ok(()) /// # } /// ``` pub fn inbox(mut self, inbox: String) -> Request { self.inbox = Some(inbox); self } } impl From<Request> for async_nats::Request { fn from(value: Request) -> Self { let mut r = Self::new(); if let Some(payload) = value.payload { r = r.payload(payload); } if let Some(headers) = value.headers { r = r.headers(headers); } if let Some(timeout) = value.timeout { r = r.timeout(timeout); } if let Some(inbox) = value.inbox { r = r.inbox(inbox); } r } } /// A hashable [`Subject`]. /// /// Note: an upstream change to `Subject` to derive `Hash` would avoid this type. #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct HashableSubject(Subject); impl hash::Hash for HashableSubject { fn hash<H: hash::Hasher>(&self, state: &mut H) { self.0.hash(state); } } impl ops::Deref for HashableSubject { type Target = Subject; fn deref(&self) -> &Self::Target { &self.0 } } impl From<Subject> for HashableSubject { fn from(value: Subject) -> Self { Self(value) } } impl ToSubject for HashableSubject { fn to_subject(&self) -> Subject { self.0.clone() } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server