Skip to main content
Glama
subscriber.rs10.1 kB
use std::{ pin::Pin, sync::Arc, task::{ Context, Poll, }, }; use async_nats::Subject; use futures::Stream; use telemetry::prelude::*; use super::{ ConnectionMetadata, Error, Message, Result, }; /// Retrieves messages from given `subscription` created by `Client::subscribe`. /// /// Implements [futures::stream::Stream] for ergonomic async message processing. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let mut nc = si_data_nats::ConnectOptions::new().connect("demo.nats.io", None).await?; /// # nc.publish("test", "data".into()).await?; /// # Ok(()) /// # } /// ``` #[derive(Debug)] pub struct Subscriber { inner: async_nats::Subscriber, metadata: Arc<SubscriberMessageMetadata>, sub_span: Span, } impl Subscriber { pub(crate) fn new( inner: async_nats::Subscriber, subject: &Subject, connection_metadata: Arc<ConnectionMetadata>, sub_span: Span, ) -> Self { let metadata = SubscriberMessageMetadata { connection_metadata, messaging_destination_name: subject.to_string(), }; Self { inner, metadata: Arc::new(metadata), sub_span, } } /// Get the subject of this subscription. pub fn subject(&self) -> &str { &self.metadata.messaging_destination_name } /// Unsubscribes from subscription, draining all remaining messages. /// /// # Examples /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// /// let mut subscriber = client.subscribe("foo").await?; /// /// subscriber.unsubscribe().await?; /// # Ok(()) /// # } /// ``` #[instrument( name = "subscriber.unsubscribe", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = self.metadata.messaging_destination_name.as_str(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), // similar to an RPC operation otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn unsubscribe(mut self) -> Result<()> { let span = current_span_for_instrument_at!("debug"); span.follows_from(&self.sub_span); self.inner .unsubscribe() .await .map_err(|err| span.record_err(Error::NatsUnsubscribe(err)))?; span.record_ok(); Ok(()) } /// Unsubscribes from subscription after reaching given number of messages. /// /// This is the total number of messages received by this subscriber in it's whole lifespan. If /// it already reached or surpassed the passed value, it will immediately stop. /// /// # Examples /// /// ```no_run /// # use futures::StreamExt; /// # #[tokio::main] /// # async fn main() -> Result<(), si_data_nats::Error> { /// let client = si_data_nats::Client::connect_with_options( /// "demo.nats.io", /// None, /// Default::default(), /// ).await?; /// /// let mut subscriber = client.subscribe("test").await?; /// subscriber.unsubscribe_after(3).await?; /// /// for _ in 0..3 { /// client.publish("test", "data".into()).await?; /// } /// /// while let Some(message) = subscriber.next().await { /// println!("message received: {:?}", message); /// } /// println!("no more messages, unsubscribed"); /// # Ok(()) /// # } /// ``` #[instrument( name = "subscriber.unsubscribe_after", skip_all, level = "debug", fields( messaging.client_id = self.metadata.messaging_client_id(), messaging.destination.name = self.metadata.messaging_destination_name.as_str(), messaging.nats.server.id = self.metadata.messaging_nats_server_id(), messaging.nats.server.name = self.metadata.messaging_nats_server_name(), messaging.nats.server.version = self.metadata.messaging_nats_server_version(), messaging.system = self.metadata.messaging_system(), messaging.url = self.metadata.messaging_url(), network.peer.address = self.metadata.network_peer_address(), network.protocol.name = self.metadata.network_protocol_name(), network.protocol.version = self.metadata.network_protocol_version(), network.transport = self.metadata.network_transport(), otel.kind = SpanKind::Client.as_str(), // similar to an RPC operation otel.status_code = Empty, otel.status_message = Empty, server.address = self.metadata.server_address(), server.port = self.metadata.server_port(), ) )] pub async fn unsubscribe_after(&mut self, unsub_after: u64) -> Result<()> { let span = current_span_for_instrument_at!("debug"); span.follows_from(&self.sub_span); self.inner .unsubscribe_after(unsub_after) .await .map_err(|err| span.record_err(Error::NatsUnsubscribe(err)))?; span.record_ok(); Ok(()) } } // API extensions impl Subscriber { /// Gets a reference to the subscriber's span. pub fn span(&self) -> &Span { &self.sub_span } /// Gets a reference to the subscriber's metadata. pub fn metadata(&self) -> &SubscriberMessageMetadata { self.metadata.as_ref() } } impl Stream for Subscriber { type Item = Message; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { match Pin::new(&mut self.inner).poll_next(cx) { Poll::Ready(Some(msg)) => Poll::Ready(Some(Message::new( msg, self.metadata.as_connection_metadata(), ))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } } #[derive(Clone, Debug)] pub struct SubscriberMessageMetadata { connection_metadata: Arc<ConnectionMetadata>, messaging_destination_name: String, } impl SubscriberMessageMetadata { /// Gets a reference to the subscriber metadata's messaging destination name. pub fn messaging_destination_name(&self) -> &str { self.messaging_destination_name.as_str() } /// Gets a reference to the subscriber metadata's messaging client id. pub fn messaging_client_id(&self) -> &str { self.connection_metadata.messaging_client_id.as_ref() } /// Gets a reference to the subscriber metadata's messaging nats server id. pub fn messaging_nats_server_id(&self) -> &str { self.connection_metadata.messaging_nats_server_id.as_ref() } /// Gets a reference to the subscriber metadata's messaging nats server name. pub fn messaging_nats_server_name(&self) -> &str { self.connection_metadata.messaging_nats_server_name.as_ref() } /// Gets a reference to the subscriber metadata's messaging nats server version. pub fn messaging_nats_server_version(&self) -> &str { self.connection_metadata .messaging_nats_server_version .as_ref() } /// Gets a reference to the subscriber metadata's messaging system. pub fn messaging_system(&self) -> &str { self.connection_metadata.messaging_system } /// Gets a reference to the subscriber metadata's messaging url. pub fn messaging_url(&self) -> &str { self.connection_metadata.messaging_url.as_ref() } /// Gets a reference to the subscriber metadata's network peer address. pub fn network_peer_address(&self) -> &str { self.connection_metadata.network_peer_address.as_ref() } /// Gets a reference to the subscriber metadata's network protocol name. pub fn network_protocol_name(&self) -> &str { self.connection_metadata.network_protocol_name } /// Gets a reference to the subscriber metadata's network protocol version. pub fn network_protocol_version(&self) -> &str { self.connection_metadata.network_protocol_version.as_ref() } /// Gets a reference to the subscriber metadata's network transport. pub fn network_transport(&self) -> &str { self.connection_metadata.network_transport } /// Gets a reference to the subscriber metadata's server address. pub fn server_address(&self) -> &str { self.connection_metadata.server_address.as_ref() } /// Gets a reference to the subscriber metadata's server port. pub fn server_port(&self) -> u16 { self.connection_metadata.server_port } /// Gets the common prefix for use on all subjects. pub fn subject_prefix(&self) -> Option<&str> { self.connection_metadata.subject_prefix.as_deref() } /// Get a reference to the subscriber message metadata's connection metadata. pub fn as_connection_metadata(&self) -> Arc<ConnectionMetadata> { self.connection_metadata.clone() } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server