Skip to main content
Glama
lib.rs10.2 kB
//! This crate provides a centralized location for working with the pending events NATS JetStream stream. #![warn( bad_style, clippy::missing_panics_doc, clippy::panic, clippy::panic_in_result_fn, clippy::unwrap_in_result, clippy::unwrap_used, dead_code, improper_ctypes, missing_debug_implementations, missing_docs, no_mangle_generic_items, non_shorthand_field_patterns, overflowing_literals, path_statements, patterns_in_fns_without_body, unconditional_recursion, unreachable_pub, unused, unused_allocation, unused_comparisons, unused_parens, while_true )] use std::time::Duration; use serde::Serialize; use serde_json::Error; use shuttle_core::{ DESTINATION_SUBJECT_SUFFIX_HEADER_KEY, FINAL_MESSAGE_HEADER_KEY, }; use si_data_nats::{ HeaderMap, Subject, async_nats::{ self, jetstream::{ context::{ CreateStreamError, PublishError, }, stream::{ Config, RetentionPolicy, }, }, }, header, jetstream, }; use si_events::{ ChangeSetId, EventSessionId, WorkspacePk, audit_log::AuditLog, }; use telemetry::prelude::*; use telemetry_nats::propagation; use thiserror::Error; const STREAM_NAME: &str = "PENDING_EVENTS"; const STREAM_DESCRIPTION: &str = "Pending events"; const SUBJECT_PREFIX: &str = "pending.event"; const THIRTY_DAYS_IN_SECONDS: u64 = 30 * 24 * 60 * 60; #[allow(missing_docs)] #[remain::sorted] #[derive(Debug, Error)] pub enum PendingEventsError { #[error("ack publish final message error: {0}")] AckPublishFinalMessage(#[source] PublishError), #[error("ack publish message error: {0}")] AckPublishMessage(#[source] PublishError), #[error("create stream error: {0}")] CreateStream(#[from] CreateStreamError), #[error("tokio join error: {0}")] Join(#[from] tokio::task::JoinError), #[error("publish final message error: {0}")] PublishFinalMessage(#[source] PublishError), #[error("publish message error: {0}")] PublishMessage(#[source] PublishError), #[error("serde json error: {0}")] SerdeJson(#[from] Error), } type Result<T> = std::result::Result<T, PendingEventsError>; /// A wrapper around the pending events stream's NATS Jetstream context with helper methods for /// interacting with the stream. #[derive(Debug, Clone)] pub struct PendingEventsStream { context: jetstream::Context, } impl PendingEventsStream { /// "Gets" or creates the pending events stream wrapper with an underlying NATS JetStream stream. pub async fn get_or_create(context: jetstream::Context) -> Result<Self> { let object = Self { context }; object.stream().await?; Ok(object) } /// "Gets" or creates the NATS JetStream stream for the pending events stream wrapper. pub async fn stream(&self) -> Result<async_nats::jetstream::stream::Stream> { Ok(self .context .get_or_create_stream(Config { name: self.prefixed_stream_name(STREAM_NAME), description: Some(STREAM_DESCRIPTION.to_string()), subjects: vec![self.prefixed_subject(SUBJECT_PREFIX, ">")], retention: RetentionPolicy::Limits, max_age: Duration::from_secs(THIRTY_DAYS_IN_SECONDS), ..Default::default() }) .await?) } /// Publishes a pending audit log. #[instrument( name = "pending_events_stream.publish_audit_log", level = "debug", skip_all )] pub async fn publish_audit_log( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, event_session_id: EventSessionId, audit_log: &AuditLog, change_set_id_for_destination_subject_suffix: ChangeSetId, ) -> Result<()> { let mut headers = propagation::empty_injected_headers(); headers.insert( DESTINATION_SUBJECT_SUFFIX_HEADER_KEY, change_set_id_for_destination_subject_suffix.to_string(), ); self.publish_message_inner( SUBJECT_PREFIX, &Self::assemble_audit_log_parameters( &workspace_id.to_string(), &change_set_id.to_string(), event_session_id, ), Some(headers), audit_log, false, ) .await } /// Publishes a pending audit log. #[instrument( name = "pending_events_stream.publish_audit_log_final_message", level = "debug", skip_all )] pub async fn publish_audit_log_final_message( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, event_session_id: EventSessionId, ) -> Result<()> { let mut headers = propagation::empty_injected_headers(); headers.insert(FINAL_MESSAGE_HEADER_KEY, ""); self.publish_message_inner( SUBJECT_PREFIX, &Self::assemble_audit_log_parameters( &workspace_id.to_string(), &change_set_id.to_string(), event_session_id, ), Some(headers), &serde_json::json!({}), true, ) .await } /// Returns the subject for publishing and consuming [`AuditLogs`](AuditLog). pub fn subject_for_audit_log( &self, workspace_id: WorkspacePk, change_set_id: ChangeSetId, event_session_id: EventSessionId, ) -> Subject { Subject::from(self.prefixed_subject( SUBJECT_PREFIX, &Self::assemble_audit_log_parameters( &workspace_id.to_string(), &change_set_id.to_string(), event_session_id, ), )) } fn assemble_audit_log_parameters( workspace_id: &str, change_set_id: &str, event_session_id: EventSessionId, ) -> String { format!("{workspace_id}.{change_set_id}.{event_session_id}.audit_log") } async fn publish_message_inner( &self, subject: &str, parameters: &str, headers: Option<HeaderMap>, message: &impl Serialize, final_message: bool, ) -> Result<()> { let self_clone = self.clone(); let message = serde_json::to_vec(message)?; let subject = subject.to_string(); let parameters = parameters.to_string(); let publishing_subject = self.prefixed_subject(&subject, &parameters); tokio::spawn(async move { let maybe_nats_message_id = match headers.as_ref() { Some(headers) => headers.get(header::NATS_MESSAGE_ID).cloned(), None => None, }; if let Err(err) = self_clone .publish_message_inner_fallible( &subject, &parameters, headers, message.into(), final_message, ) .await { let metadata = self_clone.context.metadata(); let nats_message_id = maybe_nats_message_id .map(|v| v.to_string()) .unwrap_or_default(); error!( messaging.client_id = metadata.messaging_client_id(), messaging.destination.name = publishing_subject.as_str(), messaging.message.id = nats_message_id, messaging.nats.server.id = metadata.messaging_nats_server_id(), messaging.nats.server.name = metadata.messaging_nats_server_name(), messaging.nats.server.version = metadata.messaging_nats_server_version(), messaging.system = metadata.messaging_system(), messaging.url = metadata.messaging_url(), network.peer.address = metadata.network_peer_address(), network.protocol.name = metadata.network_protocol_name(), network.protocol.version = metadata.network_protocol_version(), network.transport = metadata.network_transport(), server.address = metadata.server_address(), server.port = metadata.server_port(), si.error.message = ?err, si.final_message = final_message, "publishing to pending_events stream failed", ); } }); Ok(()) } async fn publish_message_inner_fallible( &self, subject: &str, parameters: &str, headers: Option<HeaderMap>, payload: bytes::Bytes, final_message: bool, ) -> Result<()> { let subject = self.prefixed_subject(subject, parameters); let publish_result = self .context .publish_with_headers( subject, headers.unwrap_or(propagation::empty_injected_headers()), payload, ) .await; let ack = if final_message { publish_result.map_err(PendingEventsError::PublishFinalMessage)? } else { publish_result.map_err(PendingEventsError::PublishMessage)? }; if final_message { ack.await .map_err(PendingEventsError::AckPublishFinalMessage)?; } else { ack.await.map_err(PendingEventsError::AckPublishMessage)?; } Ok(()) } fn prefixed_stream_name(&self, stream_name: &str) -> String { match self.context.metadata().subject_prefix() { Some(prefix) => format!("{prefix}_{stream_name}"), None => stream_name.to_owned(), } } fn prefixed_subject(&self, subject: &str, suffix: &str) -> String { match self.context.metadata().subject_prefix() { Some(prefix) => format!("{prefix}.{subject}.{suffix}"), None => format!("{subject}.{suffix}"), } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/systeminit/si'

If you have feedback or need assistance with the MCP directory API, please join our Discord server