Skip to main content
Glama
context.rs59.4 kB
use std::{ borrow::Borrow, sync::Arc, time::Duration, }; use async_nats::{ HeaderMap, HeaderValue, header::{ self, IntoHeaderName, IntoHeaderValue, }, jetstream::{ account::Account, consumer::{ Consumer, FromConsumer, IntoConsumerConfig, }, context::{ AccountError, CreateKeyValueError, CreateObjectStoreError, CreateStreamError, DeleteObjectStore, DeleteStreamError, GetStreamByNameError, GetStreamError, KeyValueError, ObjectStoreError, PublishAckFuture, PublishError, RequestError, UpdateStreamError, }, stream::{ Config, ConsumerError, DeleteStatus, Info, Stream, }, }, subject::ToSubject, }; use bytes::Bytes; use serde::{ Serialize, de::DeserializeOwned, }; use telemetry::prelude::*; use crate::{ Client, ConnectionMetadata, }; /// A context which can perform jetstream scoped requests. #[derive(Debug, Clone)] pub struct Context { inner: async_nats::jetstream::Context, metadata: Arc<ConnectionMetadata>, } impl Context { // TODO(fnichol): refactor pub fn as_inner(&self) -> &async_nats::jetstream::Context { &self.inner } pub(crate) fn new(client: Client) -> Self { let (inner_client, metadata) = client.into_parts(); let inner = async_nats::jetstream::new(inner_client); Self { inner, metadata } } pub fn set_timeout(&mut self, timeout: Duration) { self.inner.set_timeout(timeout) } pub fn metadata(&self) -> &ConnectionMetadata { &self.metadata } pub(crate) fn with_prefix(client: Client, prefix: &str) -> Self { let (inner_client, metadata) = client.into_parts(); let inner = async_nats::jetstream::with_prefix(inner_client, prefix); Self { inner, metadata } } pub(crate) fn with_domain<T: AsRef<str>>(client: Client, domain: T) -> Self { let (inner_client, metadata) = client.into_parts(); let inner = async_nats::jetstream::with_domain(inner_client, domain); Self { inner, metadata } } /// Publishes [`jetstream::Message`][async_nats::jetstream::message::Message] to the [`Stream`] /// without waiting for acknowledgment from the server that the message has been successfully /// delivered. /// /// Acknowledgment future that can be polled is returned instead. /// /// If the stream does not exist, `no responders` error will be returned. /// /// # Examples /// /// Publish, and after each publish, await for acknowledgment. /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let ack = jetstream.publish("events", "data".into()).await?; /// ack.await?; /// jetstream.publish("events", "data".into()).await?.await?; /// # Ok(()) /// # } /// ``` /// /// Publish and do not wait for the acknowledgment. Await can be deferred to when needed or /// ignored entirely. /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let first_ack = jetstream.publish("events", "data".into()).await?; /// let second_ack = jetstream.publish("events", "data".into()).await?; /// first_ack.await?; /// second_ack.await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.publish", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Producer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn publish<S: ToSubject>( &self, subject: S, payload: Bytes, ) -> Result<PublishAckFuture, PublishError> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record("messaging.message.body.size", payload.len()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); let fut = self .inner .publish(subject, payload) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(fut) } /// Publish a message with headers to a given subject associated with a stream and returns an /// acknowledgment from the server that the message has been successfully delivered. /// /// If the stream does not exist, `no responders` error will be returned. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let mut headers = async_nats::HeaderMap::new(); /// headers.append("X-key", "Value"); /// let ack = jetstream /// .publish_with_headers("events", headers, "data".into()) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.publish_with_headers", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Producer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn publish_with_headers<S: ToSubject>( &self, subject: S, headers: async_nats::header::HeaderMap, payload: Bytes, ) -> Result<PublishAckFuture, PublishError> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record("messaging.message.body.size", payload.len()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); let fut = self .inner .publish_with_headers(subject, headers, payload) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(fut) } /// Publish a message built by [Publish] and returns an acknowledgment future. /// /// If the stream does not exist, `no responders` error will be returned. /// /// # Examples /// /// ```no_run /// # use async_nats::jetstream::context::Publish; /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let ack = jetstream /// .send_publish( /// "events", /// Publish::build().payload("data".into()).message_id("uuid"), /// ) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.send_publish", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.message.body.size = Empty, // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Producer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn send_publish<S: ToSubject>( &self, subject: S, publish: Publish, ) -> Result<PublishAckFuture, PublishError> { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record("messaging.message.body.size", publish.payload.len()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); let fut = self .inner .send_publish(subject, publish.into()) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(fut) } /// Query the server for account information. #[instrument( name = "context.query_account", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn query_account(&self) -> Result<Account, AccountError> { let span = current_span_for_instrument_at!("debug"); let account = self .inner .query_account() .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(account) } /// Create a JetStream [`Stream`] with given config and return a handle to it. /// /// That handle can be used to manage and use [`Consumer`]. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use async_nats::jetstream::stream::Config; /// use async_nats::jetstream::stream::DiscardPolicy; /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let stream = jetstream /// .create_stream(Config { /// name: "events".to_string(), /// max_messages: 100_000, /// discard: DiscardPolicy::Old, /// ..Default::default() /// }) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.create_stream", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn create_stream<S>( &self, stream_config: S, ) -> Result<Stream<Info>, CreateStreamError> where Config: From<S>, { let span = current_span_for_instrument_at!("debug"); let stream = self .inner .create_stream(stream_config) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(stream) } /// Checks for [`Stream`] existence on the server and returns handle to it. /// /// That handle can be used to manage and use [`Consumer`]. This variant does not fetch /// [`Stream`] info from the server. It means it does not check if the stream actually exists. /// If you run more operations on few streams, it is better to use [`Context::get_stream`] /// instead. If you however run single operations on many streams, this method is more /// efficient. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let stream = jetstream.get_stream_no_info("events").await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.get_stream_no_info", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn get_stream_no_info<T: AsRef<str>>( &self, stream: T, ) -> Result<Stream<()>, GetStreamError> { let span = current_span_for_instrument_at!("debug"); let stream = self .inner .get_stream_no_info(stream) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(stream) } /// Checks for [`Stream`] existence on the server and returns handle to it. /// That handle can be used to manage and use [`Consumer`]. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let stream = jetstream.get_stream("events").await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.get_stream", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> { let span = current_span_for_instrument_at!("debug"); let stream = self .inner .get_stream(stream) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(stream) } /// Create a stream with the given configuration on the server if it is not present. /// /// Returns a handle to the stream on the server. /// /// Note: This does not validate if the Stream on the server is compatible with the /// configuration passed in. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use async_nats::jetstream::stream::Config; /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let stream = jetstream /// .get_or_create_stream(Config { /// name: "events".to_string(), /// max_messages: 10_000, /// ..Default::default() /// }) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.get_or_create_stream", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn get_or_create_stream<S>( &self, stream_config: S, ) -> Result<Stream, CreateStreamError> where S: Into<Config>, { let span = current_span_for_instrument_at!("debug"); let stream_config = stream_config.into(); trace!(?stream_config); let stream = self .inner .get_or_create_stream(stream_config) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(stream) } /// Deletes a [`Stream`] with a given name. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use async_nats::jetstream::stream::Config; /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let stream = jetstream.delete_stream("events").await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.delete_stream", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn delete_stream<T: AsRef<str>>( &self, stream: T, ) -> Result<async_nats::jetstream::stream::DeleteStatus, DeleteStreamError> { let span = current_span_for_instrument_at!("debug"); let status = self .inner .delete_stream(stream) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(status) } /// Updates a [`Stream`] with a given config. If specific field cannot be updated, error is /// returned. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use async_nats::jetstream::stream::Config; /// use async_nats::jetstream::stream::DiscardPolicy; /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let stream = jetstream /// .update_stream(&Config { /// name: "events".to_string(), /// discard: DiscardPolicy::New, /// max_messages: 50_000, /// ..Default::default() /// }) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.update_stream", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn update_stream<S>( &self, config: S, ) -> Result<async_nats::jetstream::stream::Info, UpdateStreamError> where S: Borrow<Config>, { let span = current_span_for_instrument_at!("debug"); let info = self .inner .update_stream(config) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(info) } /// Looks up Stream that contains provided subject. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use futures::TryStreamExt; /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// let stream_name = jetstream.stream_by_subject("foo.>"); /// # Ok(()) /// # } /// ``` #[instrument( name = "context.stream_by_subject", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn stream_by_subject<T: Into<String>>( &self, subject: T, ) -> Result<String, GetStreamByNameError> { let span = current_span_for_instrument_at!("debug"); let subject = subject.into(); span.record("messaging.destination.name", subject.as_str()); let name = self .inner .stream_by_subject(subject) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(name) } /// Lists names of all streams for current context. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use futures::TryStreamExt; /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// let mut names = jetstream.stream_names(); /// while let Some(stream) = names.try_next().await? { /// println!("stream: {}", stream); /// } /// # Ok(()) /// # } /// ``` pub fn stream_names(&self) -> async_nats::jetstream::context::StreamNames { self.inner.stream_names() } /// Lists all streams info for current context. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use futures::TryStreamExt; /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// let mut streams = jetstream.streams(); /// while let Some(stream) = streams.try_next().await? { /// println!("stream: {:?}", stream); /// } /// # Ok(()) /// # } /// ``` pub fn streams(&self) -> async_nats::jetstream::context::Streams { self.inner.streams() } /// Returns an existing key-value bucket. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// let kv = jetstream.get_key_value("bucket").await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.get_key_value", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn get_key_value<T: Into<String>>( &self, bucket: T, ) -> Result<async_nats::jetstream::kv::Store, KeyValueError> { let span = current_span_for_instrument_at!("debug"); let store = self .inner .get_key_value(bucket) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(store) } /// Creates a new key-value bucket. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// let kv = jetstream /// .create_key_value(async_nats::jetstream::kv::Config { /// bucket: "kv".to_string(), /// history: 10, /// ..Default::default() /// }) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.create_key_value", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn create_key_value( &self, config: async_nats::jetstream::kv::Config, ) -> Result<async_nats::jetstream::kv::Store, CreateKeyValueError> { let span = current_span_for_instrument_at!("debug"); let store = self .inner .create_key_value(config) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(store) } /// Deletes given key-value bucket. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// let kv = jetstream /// .create_key_value(async_nats::jetstream::kv::Config { /// bucket: "kv".to_string(), /// history: 10, /// ..Default::default() /// }) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.delete_key_value", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn delete_key_value<T: AsRef<str>>( &self, bucket: T, ) -> Result<async_nats::jetstream::stream::DeleteStatus, KeyValueError> { let span = current_span_for_instrument_at!("debug"); let status = self .inner .delete_key_value(bucket) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(status) } /// Get a [`Consumer`] straight from [`Context`], without binding to a [`Stream`] first. /// /// It has one less interaction with the server when binding to only one /// [`Consumer`]. /// /// # Examples: /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use async_nats::jetstream::consumer::PullConsumer; /// /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let consumer: PullConsumer = jetstream /// .get_consumer_from_stream("consumer", "stream") /// .await?; /// /// # Ok(()) /// # } /// ``` #[instrument( name = "context.get_consumer_from_stream", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn get_consumer_from_stream<T, C, S>( &self, consumer: C, stream: S, ) -> Result<Consumer<T>, ConsumerError> where T: FromConsumer + IntoConsumerConfig, S: AsRef<str>, C: AsRef<str>, { let span = current_span_for_instrument_at!("debug"); let consumer = self .inner .get_consumer_from_stream(consumer, stream) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(consumer) } /// Delete a [`Consumer`] straight from [`Context`], without binding to a [`Stream`] first. /// /// It has one less interaction with the server when binding to only one [`Consumer`]. /// /// # Examples: /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use async_nats::jetstream::consumer::PullConsumer; /// /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// jetstream /// .delete_consumer_from_stream("consumer", "stream") /// .await?; /// /// # Ok(()) /// # } /// ``` #[instrument( name = "context.delete_consumer_from_stream", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>( &self, consumer: C, stream: S, ) -> Result<DeleteStatus, ConsumerError> { let span = current_span_for_instrument_at!("debug"); let status = self .inner .delete_consumer_from_stream(consumer, stream) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(status) } /// Create a new `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and /// returns the info from the server about created [`Consumer`] without binding to a [`Stream`] /// first. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// use async_nats::jetstream::consumer; /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let consumer: consumer::PullConsumer = jetstream /// .create_consumer_on_stream( /// consumer::pull::Config { /// durable_name: Some("pull".to_string()), /// ..Default::default() /// }, /// "stream", /// ) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.create_consumer_from_stream", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>( &self, config: C, stream: S, ) -> Result<Consumer<C>, ConsumerError> { let span = current_span_for_instrument_at!("debug"); let consumer = self .inner .create_consumer_on_stream(config, stream) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(consumer) } /// Send a request to the jetstream JSON API. /// /// This is a low level API used mostly internally, that should be used only in specific cases /// when this crate API on [`Consumer`] or [`Stream`] does not provide needed functionality. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # use async_nats::jetstream::stream::Info; /// # use async_nats::jetstream::response::Response; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// /// let response: Response<Info> = jetstream.request("STREAM.INFO.events", &()).await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.request", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = Empty, // messaging.message.conversation_id = Empty, messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.operation = MessagingOperation::Publish.as_str(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Producer.as_str(), otel.name = Empty, otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn request<S, T, V>(&self, subject: S, payload: &T) -> Result<V, RequestError> where S: ToSubject, T: ?Sized + Serialize, V: DeserializeOwned, { let span = current_span_for_instrument_at!("debug"); let subject = subject.to_subject(); span.record("messaging.destination.name", subject.as_str()); span.record( "otel.name", format!("{} {}", &subject, MessagingOperation::Publish.as_str()).as_str(), ); let v = self .inner .request(subject, payload) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(v) } /// Creates a new object store bucket. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// let bucket = jetstream /// .create_object_store(async_nats::jetstream::object_store::Config { /// bucket: "bucket".to_string(), /// ..Default::default() /// }) /// .await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.create_object_store", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn create_object_store( &self, config: async_nats::jetstream::object_store::Config, ) -> Result<async_nats::jetstream::object_store::ObjectStore, CreateObjectStoreError> { let span = current_span_for_instrument_at!("debug"); let store = self .inner .create_object_store(config) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(store) } /// Get an existing object store bucket. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// let bucket = jetstream.get_object_store("bucket").await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.get_object_store", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn get_object_store<T: AsRef<str>>( &self, bucket_name: T, ) -> Result<async_nats::jetstream::object_store::ObjectStore, ObjectStoreError> { let span = current_span_for_instrument_at!("debug"); let store = self .inner .get_object_store(bucket_name) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(store) } /// Delete a object store bucket. /// /// # Examples /// /// ```no_run /// # use si_data_nats::{Client, ConnectOptions}; /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = Client::connect_with_options( /// "localhost:4222", /// None, /// ConnectOptions::default(), /// ).await?; /// let jetstream = si_data_nats::jetstream::new(client); /// let bucket = jetstream.delete_object_store("bucket").await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "context.delete_object_store", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Internal.as_str(), otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn delete_object_store<T: AsRef<str>>( &self, bucket_name: T, ) -> Result<(), DeleteObjectStore> { let span = current_span_for_instrument_at!("debug"); self.inner .delete_object_store(bucket_name) .await .map_err(|err| span.record_err(err))?; span.record_ok(); Ok(()) } } /// Used for building customized `publish` message. /// /// Note: this type is wrapped in order to expose the inner fields this wrapping module. #[derive(Default, Clone, Debug)] pub struct Publish { payload: Bytes, headers: Option<async_nats::header::HeaderMap>, } impl Publish { /// Creates a new custom Publish struct to be used with. pub fn build() -> Self { Default::default() } /// Sets the payload for the message. pub fn payload(mut self, payload: Bytes) -> Self { self.payload = payload; self } /// Adds headers to the message. pub fn headers(mut self, headers: HeaderMap) -> Self { self.headers = Some(headers); self } /// A shorthand to add a single header. pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self { self.headers .get_or_insert(header::HeaderMap::new()) .insert(name, value); self } /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window. pub fn message_id<T: AsRef<str>>(self, id: T) -> Self { self.header(header::NATS_MESSAGE_ID, id.as_ref()) } /// Sets expected last message ID. /// /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value. pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self { self.header( header::NATS_EXPECTED_LAST_MESSAGE_ID, last_message_id.as_ref(), ) } /// Sets the last expected stream sequence. /// /// It sets the `Nats-Expected-Last-Sequence` header with provided value. pub fn expected_last_sequence(self, last_sequence: u64) -> Self { self.header( header::NATS_EXPECTED_LAST_SEQUENCE, HeaderValue::from(last_sequence), ) } /// Sets the last expected stream sequence for a subject this message will be published to. /// /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value. pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self { self.header( header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, HeaderValue::from(subject_sequence), ) } /// Sets the expected stream name. /// /// It sets the `Nats-Expected-Stream` header with provided value. pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self { self.header( header::NATS_EXPECTED_STREAM, HeaderValue::from(stream.as_ref()), ) } } impl From<Publish> for async_nats::jetstream::context::Publish { fn from(value: Publish) -> Self { let mut p = Self::build(); p = p.payload(value.payload); if let Some(headers) = value.headers { p = p.headers(headers); } p } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server