Skip to main content
Glama
builder.rs3.65 kB
//! This module contains [`SubscriberBuilder`], which is used for creating //! [`Subscribers`](crate::Subscriber). use std::marker::PhantomData; use si_data_nats::{ NatsClient, Subject, subject::ToSubject, }; use telemetry::prelude::*; use telemetry_nats::NatsMakeSpan; use crate::{ Subscriber, SubscriberError, SubscriberResult, }; /// The [`builder`](Self) used for creating a [`Subscriber`]. pub struct SubscriberBuilder<T> { /// The [NATS](https://nats.io) subject used. pub subject: Subject, /// Indicates the final type of the [`Request`](crate::Request). _phantom: PhantomData<T>, /// If provided, the [`Subscriber`] will use [`NatsClient::queue_subscribe`]. Otherwise, it /// [`NatsClient::subscribe`]. pub queue_name: Option<String>, /// If a key is provided, the [`Subscriber`] will only close successfully if a "final message" /// is seen. Otherwise, it can close successfully without receiving a "final message". pub final_message_header_key: Option<String>, /// If set, the [`Subscriber`] will check for a reply mailbox in the /// [`Request`](crate::Request). /// Otherwise, it will not perform the check. pub check_for_reply_mailbox: bool, /// The logging level of the message processing spans span_level: Level, } impl<T> SubscriberBuilder<T> { /// Create a new [`builder`](SubscriberBuilder) for building a [`Subscriber`]. pub fn new(subject: impl ToSubject) -> Self { Self { subject: subject.to_subject(), _phantom: PhantomData::<T>, queue_name: None, final_message_header_key: None, check_for_reply_mailbox: false, span_level: Level::INFO, } } /// Start a new [`Subscriber`] for a given [`request`](crate::Request) shape `T`. This will /// consume [`Self`]. /// /// # Errors /// /// Returns [`SubscriberError`] if a [`Subscriber`] could not be created. pub async fn start(self, nats: &NatsClient) -> SubscriberResult<Subscriber<T>> { let inner = if let Some(queue_name) = self.queue_name { nats.queue_subscribe(self.subject.clone(), queue_name) .await .map_err(SubscriberError::NatsSubscribe)? } else { nats.subscribe(self.subject.clone()) .await .map_err(SubscriberError::NatsSubscribe)? }; let make_span = NatsMakeSpan::builder(nats.metadata_clone()) .level(self.span_level) .build(); Ok(Subscriber { inner, _phantom: PhantomData::<T>, subject: self.subject, final_message_header_key: self.final_message_header_key, check_for_reply_mailbox: self.check_for_reply_mailbox, make_span, }) } /// Sets the "queue_name" field. pub fn queue_name(mut self, queue_name: impl Into<String>) -> Self { self.queue_name = Some(queue_name.into()); self } /// Sets the "final_message_header_key" field. pub fn final_message_header_key(mut self, final_message_header_key: impl Into<String>) -> Self { self.final_message_header_key = Some(final_message_header_key.into()); self } /// Sets the "check_for_reply_mailbox" field. pub fn check_for_reply_mailbox(mut self) -> Self { self.check_for_reply_mailbox = true; self } /// Sets the logging level for the message processing spans. pub fn span_level(mut self, level: Level) -> Self { self.span_level = level; self } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server